This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bb2e4a8a05 IGNITE-23748 Lock LWM when executing RO operation on data 
node (#4974)
bb2e4a8a05 is described below

commit bb2e4a8a0574442dff5baa497fb3e4fd49ff81dc
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Dec 30 11:47:08 2024 +0400

    IGNITE-23748 Lock LWM when executing RO operation on data node (#4974)
    
    * When executing an operation in an RO transaction (explicit or implicit), 
attempt to lock LWM on the data node where it's executed
    * Direct RO operations (which happen in implicit transactions) only lock 
LWM if they concern more than 1 key
    * If lock attempt fails, throw an exception with a specific error code
    * When cleaning up after an RO transaction had been finished, unlock LWM on 
each node where such a cleanup happens
    * For direct RO operations, unlock LWM right after the read has been done 
on the data node
    * When locking LWM for an explicit transaction, also register a hook to 
unlock LWM when the transaction coordinator leaves
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   6 +
 .../ignite/client/fakes/FakeInternalTable.java     |   4 +
 .../replication/ReadOnlyReplicaRequest.java        |  12 +
 .../ReadOnlyScanRetrieveBatchReplicaRequest.java   |   9 +-
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../ignite/internal/benchmark/SelectBenchmark.java |  26 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../org/apache/ignite/internal/TestWrappers.java   |  10 +
 .../internal/sql/sqllogic/ItSqlLogicTest.java      |   7 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   3 +-
 .../table/ItTransactionPrimaryChangeTest.java      |   9 +-
 .../internal/table/ItTransactionRecoveryTest.java  |   8 +-
 .../ignite/internal/table/InternalTable.java       |  45 ++-
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../replicator/PartitionReplicaListener.java       | 133 ++++++++-
 .../distributed/storage/InternalTableImpl.java     |  12 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +-
 ...itionReplicaListenerSortedIndexLockingTest.java |   4 +-
 .../replication/PartitionReplicaListenerTest.java  | 241 +++++++++++++---
 .../storage/InternalTableEstimatedSizeTest.java    |   4 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   9 +-
 .../table/impl/DummyInternalTableImpl.java         |   3 +-
 modules/transactions/build.gradle                  |   4 +
 .../readonly/ItReadOnlyTxAndLowWatermarkTest.java  | 307 +++++++++++++++++++++
 .../FinishedTransactionBatchRequestHandler.java    |  17 +-
 .../tx/impl/RemotelyTriggeredResourceRegistry.java |   6 +-
 .../internal/tx/impl/ResourceVacuumManager.java    |  13 +-
 ...FinishedTransactionBatchRequestHandlerTest.java |  92 ++++++
 31 files changed, 911 insertions(+), 91 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 064e875447..91fa9cf2bb 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -358,6 +358,12 @@ public class ErrorGroups {
 
         /** Failure due to a stale operation of a completed transaction is 
detected. */
         public static final int TX_STALE_OPERATION_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 14);
+
+        /**
+         * Error occurred when trying to execute an operation in a read-only 
transaction on a node that has already destroyed data for
+         * read timestamp of the transaction.
+         */
+        public static final int TX_STALE_READ_ONLY_OPERATION_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 15);
     }
 
     /** Replicator error group. */
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 5d35fd6178..c0992d2624 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -147,6 +147,8 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     public CompletableFuture<BinaryRow> get(
             BinaryRowEx keyRow,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode) {
         return null;
     }
@@ -179,6 +181,8 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     public CompletableFuture<List<BinaryRow>> getAll(
             Collection<BinaryRowEx> keyRows,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode
     ) {
         return null;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
index 1acdbba89e..3e60ac9ac0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
@@ -17,12 +17,24 @@
 
 package org.apache.ignite.internal.partition.replicator.network.replication;
 
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Read only replica request.
  */
 public interface ReadOnlyReplicaRequest extends ReplicaRequest {
     HybridTimestamp readTimestamp();
+
+    /** ID of the transaction in which this request is made. */
+    // TODO: remove @Nullable after IGNITE-24120 is sorted out.
+    @Nullable
+    UUID transactionId();
+
+    /** Inconsistent ID of coordinator of transaction to which this operation 
belongs. */
+    // TODO: remove @Nullable after IGNITE-24120 is sorted out.
+    @Nullable
+    UUID coordinatorId();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
index 48773454e7..4ccf80b0a3 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -27,12 +27,11 @@ import 
org.apache.ignite.internal.replicator.message.TableAware;
  */
 
@Transferable(PartitionReplicationMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
 public interface ReadOnlyScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest, TableAware {
+    // TODO: remove override after @Nullable is removed from the 
super-interface method, see IGNITE-24120.
+    @Override
     UUID transactionId();
 
-    /**
-     * Get the transaction coordinator inconsistent ID.
-     *
-     * @return Transaction coordinator inconsistent ID.
-     */
+    // TODO: remove override after @Nullable is removed from the 
super-interface method, see IGNITE-24120.
+    @Override
     UUID coordinatorId();
 }
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 438074e10c..7a3666080a 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -130,6 +130,7 @@ enum class code : underlying_t {
     TX_PRIMARY_REPLICA_EXPIRED = 0x7000c,
     TX_ALREADY_FINISHED = 0x7000d,
     TX_STALE_OPERATION = 0x7000e,
+    TX_STALE_READ_ONLY_OPERATION = 0x7000f,
 
     // Replicator group. Group code: 8
     REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index a57bf7101d..b3c480b373 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -202,6 +202,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::TX_PRIMARY_REPLICA_EXPIRED:
         case error::code::TX_ALREADY_FINISHED:
         case error::code::TX_STALE_OPERATION:
+        case error::code::TX_STALE_READ_ONLY_OPERATION:
             return sql_state::S25000_INVALID_TRANSACTION_STATE;
 
         // Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 7b5c75d863..9e7563c6c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -291,6 +291,9 @@ namespace Apache.Ignite
 
             /// <summary> TxStaleOperation error. </summary>
             public const int TxStaleOperation = (GroupCode << 16) | (14 & 
0xFFFF);
+
+            /// <summary> TxStaleReadOnlyOperation error. </summary>
+            public const int TxStaleReadOnlyOperation = (GroupCode << 16) | 
(15 & 0xFFFF);
         }
 
         /// <summary> Replicator errors. </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index 52de6ec493..67b4253d3f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -43,6 +43,10 @@ import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -81,6 +85,10 @@ public class SelectBenchmark extends 
AbstractMultiNodeBenchmark {
 
     private KeyValueView<Tuple, Tuple> keyValueView;
 
+    private IgniteTransactions transactions;
+
+    private final TransactionOptions readOnlyTransactionOptions = new 
TransactionOptions().readOnly(true);
+
     @Param({"1", "2", "3"})
     private int clusterSize;
 
@@ -101,6 +109,8 @@ public class SelectBenchmark extends 
AbstractMultiNodeBenchmark {
 
             keyValueView.put(null, Tuple.create().set("ycsb_key", id++), t);
         }
+
+        transactions = publicIgnite.transactions();
     }
 
     /**
@@ -172,7 +182,21 @@ public class SelectBenchmark extends 
AbstractMultiNodeBenchmark {
      */
     @Benchmark
     public void kvGet(Blackhole bh) {
-        Tuple val = keyValueView.get(null, Tuple.create().set("ycsb_key", 
random.nextInt(TABLE_SIZE)));
+        doKvGet(null, bh);
+    }
+
+    /**
+     * Benchmark for KV get in RO transaction via embedded client.
+     */
+    @Benchmark
+    public void kvGetInRoTransaction(Blackhole bh) {
+        transactions.runInTransaction(tx -> {
+            doKvGet(tx, bh);
+        }, readOnlyTransactionOptions);
+    }
+
+    private void doKvGet(@Nullable Transaction tx, Blackhole bh) {
+        Tuple val = keyValueView.get(tx, Tuple.create().set("ycsb_key", 
random.nextInt(TABLE_SIZE)));
         bh.consume(val);
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index da4f3d518a..9a221c5551 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -638,7 +638,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 clusterSvc.topologyService(),
                 clusterSvc.messagingService(),
                 transactionInflights,
-                txManager
+                txManager,
+                lowWatermark
         );
 
         var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 79486d742d..3b560667e3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -994,7 +994,8 @@ public class IgniteImpl implements Ignite {
                 clusterSvc.topologyService(),
                 messagingServiceReturningToStorageOperationsPool,
                 transactionInflights,
-                txManager
+                txManager,
+                lowWatermark
         );
 
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
index 744adf056f..c9fb0379b0 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.table.IgniteTables;
@@ -110,4 +111,13 @@ public class TestWrappers {
     public static Transaction unwrapIgniteTransaction(Transaction tx) {
         return Wrappers.unwrap(tx, Transaction.class);
     }
+
+    /**
+     * Unwraps an {@link InternalTransaction} from an {@link Transaction}.
+     *
+     * @param tx Object to unwrap.
+     */
+    public static InternalTransaction unwrapInternalTransaction(Transaction 
tx) {
+        return Wrappers.unwrap(tx, InternalTransaction.class);
+    }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
index 0aa757cb7b..bee186f5e3 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.IgniteSql;
@@ -142,6 +143,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @Tag("sqllogic")
 @ExtendWith({SystemPropertiesExtension.class, WorkDirectoryExtension.class})
 @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
+// The following is to make sure we unlock LWM on data nodes promptly so that 
dropped tables are destroyed fast.
+@WithSystemProperty(key = 
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = 
"1000")
 @SqlLogicTestEnvironment(scriptsRoot = "src/integrationTest/sql/group1")
 public class ItSqlLogicTest extends BaseIgniteAbstractTest {
     private static final String SQL_LOGIC_TEST_INCLUDE_SLOW = 
"SQL_LOGIC_TEST_INCLUDE_SLOW";
@@ -348,8 +351,8 @@ public class ItSqlLogicTest extends BaseIgniteAbstractTest {
                 .clusterName("cluster")
                 .clusterConfiguration("ignite {"
                         + "metaStorage.idleSyncTimeInterval: " + 
METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS + ",\n"
-                        + "gc.lowWatermark.dataAvailabilityTime: 1010,\n"
-                        + "gc.lowWatermark.updateInterval: 3000,\n"
+                        + "gc.lowWatermark.dataAvailabilityTime: 5000,\n"
+                        + "gc.lowWatermark.updateInterval: 1000,\n"
                         + "metrics.exporters.logPush.exporterName: logPush,\n"
                         + "metrics.exporters.logPush.period: 5000\n"
                         + "}")
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 0266b7fde4..bcce6ff3e1 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -197,7 +197,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         clusterNodeResolver,
                         resourcesRegistry,
                         schemaRegistry,
-                        mock(IndexMetaStorage.class)
+                        mock(IndexMetaStorage.class),
+                        lowWatermark
                 ) {
                     @Override
                     public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID senderId) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index fededae649..5951a4a2b1 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.table;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
+import static 
org.apache.ignite.internal.TestWrappers.unwrapInternalTransaction;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
@@ -125,7 +124,7 @@ public class ItTransactionPrimaryChangeTest extends 
ClusterPerTestIntegrationTes
 
         // Put some value into the table.
         Transaction txPreload = txCrdNode.transactions().begin();
-        log.info("Test: Preloading the data [tx={}].", 
((ReadWriteTransactionImpl) unwrapIgniteTransaction(txPreload)).id());
+        log.info("Test: Preloading the data [tx={}].", 
unwrapInternalTransaction(txPreload).id());
         view.upsert(txPreload, Tuple.create().set("key", 1).set("val", "1"));
         txPreload.commit();
 
@@ -171,7 +170,7 @@ public class ItTransactionPrimaryChangeTest extends 
ClusterPerTestIntegrationTes
             // Start a regular transaction that increments the value. It 
should see the initially inserted value and its commit should
             // succeed.
             Transaction tx = txCrdNode.transactions().begin();
-            log.info("Test: Started the regular transaction [txId={}].", 
((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
+            log.info("Test: Started the regular transaction [txId={}].", 
unwrapInternalTransaction(tx).id());
 
             Tuple t = view.get(tx, Tuple.create().set("key", 1));
             assertEquals("1", t.value(1));
@@ -179,7 +178,7 @@ public class ItTransactionPrimaryChangeTest extends 
ClusterPerTestIntegrationTes
 
             tx.commit();
 
-            log.info("Test: Completed the regular transaction [txId={}].", 
((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
+            log.info("Test: Completed the regular transaction [txId={}].", 
unwrapInternalTransaction(tx).id());
         } finally {
             regularTxComplete.complete(null);
         }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index b25124576f..37e819f23e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -1063,7 +1063,9 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         assertFalse(scanned.isDone());
 
-        assertEquals(initialCursorsCount + 1, 
targetNode.resourcesRegistry().resources().size());
+        // One for the cursor; for RO, there is also the transaction LWM lock.
+        int delta = tx.isReadOnly() ? 2 : 1;
+        assertEquals(initialCursorsCount + delta, 
targetNode.resourcesRegistry().resources().size());
     }
 
     @Test
@@ -1110,8 +1112,8 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         scanSingleEntryAndLeaveCursorOpen(txExecNode, 
unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), roTx);
 
-        // After the RO scan there should be one open cursor.
-        assertEquals(1, txExecNode.resourcesRegistry().resources().size());
+        // After the RO scan there should be one open cursor plus transaction 
LWM lock resource.
+        assertEquals(2, txExecNode.resourcesRegistry().resources().size());
 
         roTx.commit();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 835860e2a5..600b3edcd0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -92,6 +92,7 @@ public interface InternalTable extends ManuallyCloseable {
      */
     CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx);
 
+    // TODO: remove get() methods which do not accept InternalTransaction, see 
IGNITE-24120.
     /**
      * Asynchronously gets a row with same key columns values as given one 
from the table on a specific node for the proposed readTimestamp.
      *
@@ -100,9 +101,29 @@ public interface InternalTable extends ManuallyCloseable {
      * @param recipientNode Cluster node that will handle given get request.
      * @return Future representing pending completion of the operation.
      */
+    default CompletableFuture<BinaryRow> get(
+            BinaryRowEx keyRow,
+            HybridTimestamp readTimestamp,
+            ClusterNode recipientNode
+    ) {
+        return get(keyRow, readTimestamp, null, null, recipientNode);
+    }
+
+    /**
+     * Asynchronously gets a row with same key columns values as given one 
from the table on a specific node for the proposed readTimestamp.
+     *
+     * @param keyRow        Row with key columns set.
+     * @param readTimestamp Read timestamp.
+     * @param transactionId Transaction ID (might be {@code null}).
+     * @param coordinatorId Ephemeral ID of the transaction coordinator.
+     * @param recipientNode Cluster node that will handle given get request.
+     * @return Future representing pending completion of the operation.
+     */
     CompletableFuture<BinaryRow> get(
             BinaryRowEx keyRow,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode
     );
 
@@ -117,6 +138,7 @@ public interface InternalTable extends ManuallyCloseable {
      */
     CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, 
@Nullable InternalTransaction tx);
 
+    // TODO: remove getAll() methods which do not accept InternalTransaction, 
see IGNITE-24120.
     /**
      * Asynchronously get rows from the table for the proposed read timestamp.
      *
@@ -127,13 +149,34 @@ public interface InternalTable extends ManuallyCloseable {
      *      guaranteed to be the same as the order of {@code keyRows}. If a 
record does not exist, the
      *      element at the corresponding index of the resulting collection is 
{@code null}.
      */
+    default CompletableFuture<List<BinaryRow>> getAll(
+            Collection<BinaryRowEx> keyRows,
+            HybridTimestamp readTimestamp,
+            ClusterNode recipientNode
+    ) {
+        return getAll(keyRows, readTimestamp, null, null, recipientNode);
+    }
+
+    /**
+     * Asynchronously get rows from the table for the proposed read timestamp.
+     *
+     * @param keyRows       Rows with key columns set.
+     * @param readTimestamp Read timestamp.
+     * @param transactionId Transaction ID (might be {@code null}).
+     * @param coordinatorId Ephemeral ID of the transaction coordinator.
+     * @param recipientNode Cluster node that will handle given get request.
+     * @return Future that will return rows with all columns filled from the 
table. The order of collection elements is
+     *      guaranteed to be the same as the order of {@code keyRows}. If a 
record does not exist, the
+     *      element at the corresponding index of the resulting collection is 
{@code null}.
+     */
     CompletableFuture<List<BinaryRow>> getAll(
             Collection<BinaryRowEx> keyRows,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode
     );
 
-
     /**
      * Asynchronously inserts a row into the table if does not exist or 
replaces the existed one.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b5762c6786..2bae27d71c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1364,7 +1364,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 topologyService,
                 remotelyTriggeredResourceRegistry,
                 schemaManager.schemaRegistry(tableId),
-                indexMetaStorage
+                indexMetaStorage,
+                lowWatermark
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index e63c9f5070..ddd6d9370d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -103,6 +103,7 @@ import 
org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
@@ -221,6 +222,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
@@ -344,6 +346,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final IndexMetaStorage indexMetaStorage;
 
+    private final LowWatermark lowWatermark;
+
     private static final boolean SKIP_UPDATES =
             
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
 
@@ -395,7 +399,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             ClusterNodeResolver clusterNodeResolver,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
             SchemaRegistry schemaRegistry,
-            IndexMetaStorage indexMetaStorage
+            IndexMetaStorage indexMetaStorage,
+            LowWatermark lowWatermark
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftCommandRunner = raftCommandRunner;
@@ -418,6 +423,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.schemaRegistry = schemaRegistry;
         this.indexMetaStorage = indexMetaStorage;
+        this.lowWatermark = lowWatermark;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -534,7 +540,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         // Don't need to validate schema.
         if (opTs == null) {
             assert opTsIfDirectRo == null;
-            return processOperationRequestWithTxRwCounter(senderId, request, 
isPrimary, null, leaseStartTime);
+            return 
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
isPrimary, null, leaseStartTime);
         }
 
         assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request 
timestamps";
@@ -555,7 +561,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         };
 
         return 
schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored
 ->
-                processOperationRequestWithTxRwCounter(senderId, request, 
isPrimary, opTsIfDirectRo, leaseStartTime));
+                
processOperationRequestWithTxOperationManagementLogic(senderId, request, 
isPrimary, opTsIfDirectRo, leaseStartTime));
     }
 
     private CompletableFuture<Long> processGetEstimatedSizeRequest() {
@@ -2196,9 +2202,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                         }))
                 );
             }
-        } catch (Exception e) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                    format("Unable to close cursor [tableId={}]", tableId()), 
e);
         }
     }
 
@@ -3966,13 +3969,40 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
-    private CompletableFuture<?> processOperationRequestWithTxRwCounter(
+    private CompletableFuture<?> 
processOperationRequestWithTxOperationManagementLogic(
             UUID senderId,
             ReplicaRequest request,
             @Nullable Boolean isPrimary,
             @Nullable HybridTimestamp opStartTsIfDirectRo,
             @Nullable Long leaseStartTime
     ) {
+        incrementRwOperationCountIfNeeded(request);
+
+        UUID txIdLockingLwm = tryToLockLwmIfNeeded(request, 
opStartTsIfDirectRo);
+
+        try {
+            return processOperationRequest(senderId, request, isPrimary, 
opStartTsIfDirectRo, leaseStartTime)
+                    .whenComplete((unused, throwable) -> {
+                        unlockLwmIfNeeded(txIdLockingLwm, request);
+                        decrementRwOperationCountIfNeeded(request);
+                    });
+        } catch (Throwable e) {
+            try {
+                unlockLwmIfNeeded(txIdLockingLwm, request);
+            } catch (Throwable unlockProblem) {
+                e.addSuppressed(unlockProblem);
+            }
+
+            try {
+                decrementRwOperationCountIfNeeded(request);
+            } catch (Throwable decrementProblem) {
+                e.addSuppressed(decrementProblem);
+            }
+            throw e;
+        }
+    }
+
+    private void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
         if (request instanceof ReadWriteReplicaRequest) {
             int rwTxActiveCatalogVersion = 
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
 
@@ -3982,15 +4012,88 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 throw new 
StaleTransactionOperationException(((ReadWriteReplicaRequest) 
request).transactionId());
             }
         }
+    }
 
-        return processOperationRequest(senderId, request, isPrimary, 
opStartTsIfDirectRo, leaseStartTime)
-                .whenComplete((unused, throwable) -> {
-                    if (request instanceof ReadWriteReplicaRequest) {
-                        txRwOperationTracker.decrementOperationCount(
-                                rwTxActiveCatalogVersion(catalogService, 
(ReadWriteReplicaRequest) request)
-                        );
-                    }
-                });
+    private void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
+        if (request instanceof ReadWriteReplicaRequest) {
+            txRwOperationTracker.decrementOperationCount(
+                    rwTxActiveCatalogVersion(catalogService, 
(ReadWriteReplicaRequest) request)
+            );
+        }
+    }
+
+    /**
+     * Generates a fake transaction ID that will only be used to identify one 
direct RO operation for purposes of locking and unlocking LWM.
+     * It should not be used as a replacement for a real transaction ID in 
other contexts.
+     */
+    private static UUID newFakeTxId() {
+        return UUID.randomUUID();
+    }
+
+    /**
+     * For an operation of an RO transaction, attempts to lock LWM on current 
node (either if the operation is not direct,
+     * or if it's direct and concerns more than one key), and does nothing for 
other types of requests.
+     *
+     * <p>If lock attempt fails, throws an exception with a specific error 
code ({@link Transactions#TX_STALE_READ_ONLY_OPERATION_ERR}).
+     *
+     * <p>For explicit RO transactions, the lock will be later released when 
cleaning up after the RO transaction had been finished.
+     *
+     * <p>For direct RO operations (which happen in implicit RO transactions), 
LWM will be unlocked right after the read had been done
+     * (see {@link #unlockLwmIfNeeded(UUID, ReplicaRequest)}).
+     *
+     * <p>Also, for explicit RO transactions, an automatic unlock is 
registered on coordinator leave.
+     *
+     * @param request Request that is being handled.
+     * @param opStartTsIfDirectRo Timestamp of operation start if the 
operation is a direct RO operation, {@code null} otherwise.
+     * @return Transaction ID (real for explicit transaction, fake for direct 
RO operation) that shoiuld be used to lock LWM,
+     *     or {@code null} if LWM doesn't need to be locked..
+     */
+    private @Nullable UUID tryToLockLwmIfNeeded(ReplicaRequest request, 
@Nullable HybridTimestamp opStartTsIfDirectRo) {
+        UUID txIdToLockLwm;
+        HybridTimestamp tsToLockLwm = null;
+
+        if (request instanceof ReadOnlyDirectMultiRowReplicaRequest
+                && ((ReadOnlyDirectMultiRowReplicaRequest) 
request).primaryKeys().size() > 1) {
+            assert opStartTsIfDirectRo != null;
+
+            txIdToLockLwm = newFakeTxId();
+            tsToLockLwm = opStartTsIfDirectRo;
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            ReadOnlyReplicaRequest readOnlyRequest = (ReadOnlyReplicaRequest) 
request;
+            txIdToLockLwm = readOnlyRequest.transactionId();
+            tsToLockLwm = readOnlyRequest.readTimestamp();
+        } else {
+            txIdToLockLwm = null;
+        }
+
+        if (txIdToLockLwm != null) {
+            if (!lowWatermark.tryLock(txIdToLockLwm, tsToLockLwm)) {
+                throw new 
TransactionException(Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read 
timestamp is not available anymore.");
+            }
+
+            registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(request, 
txIdToLockLwm);
+        }
+
+        return txIdToLockLwm;
+    }
+
+    private void 
registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest request, UUID 
txIdToLockLwm) {
+        if (request instanceof ReadOnlyReplicaRequest) {
+            ReadOnlyReplicaRequest readOnlyReplicaRequest = 
(ReadOnlyReplicaRequest) request;
+
+            UUID coordinatorId = readOnlyReplicaRequest.coordinatorId();
+            // TODO: remove null check after IGNITE-24120 is sorted out.
+            if (coordinatorId != null) {
+                FullyQualifiedResourceId resourceId = new 
FullyQualifiedResourceId(txIdToLockLwm, txIdToLockLwm);
+                remotelyTriggeredResourceRegistry.register(resourceId, 
coordinatorId, () -> () -> lowWatermark.unlock(txIdToLockLwm));
+            }
+        }
+    }
+
+    private void unlockLwmIfNeeded(@Nullable UUID txIdToUnlockLwm, 
ReplicaRequest request) {
+        if (txIdToUnlockLwm != null && request instanceof 
ReadOnlyDirectReplicaRequest) {
+            lowWatermark.unlock(txIdToUnlockLwm);
+        }
     }
 
     private void prepareIndexBuilderTxRwOperationTracker() {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 7f0b985cbc..be91eb7f07 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -888,7 +888,7 @@ public class InternalTableImpl implements InternalTable {
 
         if (tx.isReadOnly()) {
             return evaluateReadOnlyRecipientNode(partitionId(keyRow), 
tx.readTimestamp())
-                    .thenCompose(recipientNode -> get(keyRow, 
tx.readTimestamp(), recipientNode));
+                    .thenCompose(recipientNode -> get(keyRow, 
tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
         }
 
         return enlistInTx(
@@ -915,6 +915,8 @@ public class InternalTableImpl implements InternalTable {
     public CompletableFuture<BinaryRow> get(
             BinaryRowEx keyRow,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode
     ) {
         int partId = partitionId(keyRow);
@@ -927,6 +929,8 @@ public class InternalTableImpl implements InternalTable {
                 .primaryKey(keyRow.tupleSlice())
                 .requestType(RO_GET)
                 .readTimestamp(readTimestamp)
+                .transactionId(transactionId)
+                .coordinatorId(coordinatorId)
                 .build()
         );
     }
@@ -981,7 +985,7 @@ public class InternalTableImpl implements InternalTable {
             BinaryRowEx firstRow = keyRows.iterator().next();
 
             return evaluateReadOnlyRecipientNode(partitionId(firstRow), 
tx.readTimestamp())
-                    .thenCompose(recipientNode -> getAll(keyRows, 
tx.readTimestamp(), recipientNode));
+                    .thenCompose(recipientNode -> getAll(keyRows, 
tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
         }
 
         return enlistInTx(
@@ -999,6 +1003,8 @@ public class InternalTableImpl implements InternalTable {
     public CompletableFuture<List<BinaryRow>> getAll(
             Collection<BinaryRowEx> keyRows,
             HybridTimestamp readTimestamp,
+            @Nullable UUID transactionId,
+            @Nullable UUID coordinatorId,
             ClusterNode recipientNode
     ) {
         Int2ObjectMap<RowBatch> rowBatchByPartitionId = 
toRowBatchByPartitionId(keyRows);
@@ -1013,6 +1019,8 @@ public class InternalTableImpl implements InternalTable {
                     
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
                     .requestType(RO_GET_ALL)
                     .readTimestamp(readTimestamp)
+                    .transactionId(transactionId)
+                    .coordinatorId(coordinatorId)
                     .build();
 
             partitionRowBatch.getValue().resultFuture = 
replicaSvc.invoke(recipientNode, request);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 0fa46d6cda..e55485ad01 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
@@ -278,7 +279,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 mock(ClusterNodeResolver.class),
                 new RemotelyTriggeredResourceRegistry(),
                 schemaManager,
-                mock(IndexMetaStorage.class)
+                mock(IndexMetaStorage.class),
+                new TestLowWatermark()
         );
 
         kvMarshaller = new 
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, 
Integer.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index df27d9ab5e..501f6d23de 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
@@ -245,7 +246,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
                 mock(ClusterNodeResolver.class),
                 new RemotelyTriggeredResourceRegistry(),
                 schemaManager,
-                mock(IndexMetaStorage.class)
+                mock(IndexMetaStorage.class),
+                new TestLowWatermark()
         );
 
         kvMarshaller = new 
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, 
Integer.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 9293aafa69..d5b09dec5a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -78,8 +78,12 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -120,6 +124,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.MessagingService;
@@ -139,6 +144,8 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Build
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowPkReplicaRequest;
@@ -155,7 +162,9 @@ import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -172,6 +181,7 @@ import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.TestStorageUtils;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -244,6 +254,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.junitpioneer.jupiter.cartesian.ArgumentSets;
@@ -251,6 +262,7 @@ import org.junitpioneer.jupiter.cartesian.CartesianTest;
 import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.Spy;
@@ -388,6 +400,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Mock
     private MessagingService messagingService;
 
+    @Mock
+    private LowWatermark lowWatermark;
+
     @InjectConfiguration
     private StorageUpdateConfiguration storageUpdateConfiguration;
 
@@ -425,6 +440,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     /** Partition replication listener to test. */
     private PartitionReplicaListener partitionReplicaListener;
 
+    private HashIndexStorage pkIndexStorage;
+
     /** Primary index. */
     private Lazy<TableSchemaAwareIndexStorage> pkStorageSupplier;
 
@@ -513,14 +530,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         ColumnsExtractor row2Tuple = 
BinaryRowConverter.keyExtractor(schemaDescriptor);
 
-        pkStorageSupplier = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
-                pkIndexId,
-                new TestHashIndexStorage(
-                        PART_ID,
-                        new StorageHashIndexDescriptor(pkIndexId, List.of(), 
false)
-                ),
-                row2Tuple
+        pkIndexStorage = spy(new TestHashIndexStorage(
+                PART_ID,
+                new StorageHashIndexDescriptor(pkIndexId, List.of(), false)
         ));
+        pkStorageSupplier = new Lazy<>(() -> new 
TableSchemaAwareIndexStorage(pkIndexId, pkIndexStorage, row2Tuple));
 
         SortedIndexStorage indexStorage = new TestSortedIndexStorage(
                 PART_ID,
@@ -649,12 +663,15 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 new SingleClusterNodeResolver(localNode),
                 new RemotelyTriggeredResourceRegistry(),
                 new DummySchemaManagerImpl(schemaDescriptor, 
schemaDescriptorVersion2),
-                indexMetaStorage
+                indexMetaStorage,
+                lowWatermark
         );
 
         kvMarshaller = marshallerFor(schemaDescriptor);
         kvMarshallerVersion2 = marshallerFor(schemaDescriptorVersion2);
 
+        when(lowWatermark.tryLock(any(), any())).thenReturn(true);
+
         reset();
     }
 
@@ -810,20 +827,46 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk, 
HybridTimestamp readTimestamp) {
-        ReadOnlySingleRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
+        ReadOnlySingleRowPkReplicaRequest request = 
readOnlySingleRowPkReplicaRequest(pk, readTimestamp);
+
+        return invokeListener(request);
+    }
+
+    private CompletableFuture<ReplicaResult> invokeListener(ReplicaRequest 
request) {
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
+
+    private ReadOnlySingleRowPkReplicaRequest 
readOnlySingleRowPkReplicaRequest(BinaryRow pk, HybridTimestamp readTimestamp) {
+        return readOnlySingleRowPkReplicaRequest(grpId, newTxId(), 
localNode.id(), pk, readTimestamp);
+    }
+
+    private static ReadOnlySingleRowPkReplicaRequest 
readOnlySingleRowPkReplicaRequest(
+            TablePartitionId grpId,
+            UUID txId,
+            UUID coordinatorId,
+            BinaryRow pk,
+            HybridTimestamp readTimestamp
+    ) {
+        return TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .readTimestamp(readTimestamp)
                 .schemaVersion(pk.schemaVersion())
                 .primaryKey(pk.tupleSlice())
+                .transactionId(txId)
+                .coordinatorId(coordinatorId)
                 .requestType(RO_GET)
                 .build();
-
-        return partitionReplicaListener.invoke(request, localNode.id());
     }
 
     private CompletableFuture<ReplicaResult> 
doReadOnlyDirectSingleGet(BinaryRow pk) {
-        ReadOnlyDirectSingleRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
+        ReadOnlyDirectSingleRowReplicaRequest request = 
readOnlyDirectSingleRowReplicaRequest(grpId, pk);
+
+        return invokeListener(request);
+    }
+
+    private static ReadOnlyDirectSingleRowReplicaRequest 
readOnlyDirectSingleRowReplicaRequest(TablePartitionId grpId, BinaryRow pk) {
+        return TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .schemaVersion(pk.schemaVersion())
@@ -831,8 +874,6 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .requestType(RO_GET)
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
-
-        return partitionReplicaListener.invoke(request, localNode.id());
     }
 
     @Test
@@ -2083,19 +2124,28 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, 
HybridTimestamp readTimestamp) {
         return partitionReplicaListener.invoke(
-                
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
-                        .groupId(tablePartitionIdMessage(grpId))
-                        .tableId(TABLE_ID)
-                        .transactionId(targetTxId)
-                        .scanId(1)
-                        .batchSize(100)
-                        .readTimestamp(readTimestamp)
-                        .coordinatorId(localNode.id())
-                        .build(),
+                readOnlyScanRetrieveBatchReplicaRequest(grpId, targetTxId, 
readTimestamp, localNode.id()),
                 localNode.id()
         );
     }
 
+    private static ReadOnlyScanRetrieveBatchReplicaRequest 
readOnlyScanRetrieveBatchReplicaRequest(
+            TablePartitionId grpId,
+            UUID txId,
+            HybridTimestamp readTimestamp,
+            UUID coordinatorId
+    ) {
+        return TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+                .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
+                .transactionId(txId)
+                .scanId(1)
+                .batchSize(100)
+                .readTimestamp(readTimestamp)
+                .coordinatorId(coordinatorId)
+                .build();
+    }
+
     @ParameterizedTest
     @MethodSource("singleRowWriteRequestTypes")
     public void 
singleRowWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType) {
@@ -2750,14 +2800,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row, 
HybridTimestamp readTimestamp) {
-        ReadOnlySingleRowPkReplicaRequest message = 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(tablePartitionIdMessage(grpId))
-                .tableId(TABLE_ID)
-                .requestType(RO_GET)
-                .readTimestamp(readTimestamp)
-                .schemaVersion(row.schemaVersion())
-                .primaryKey(row.tupleSlice())
-                .build();
+        ReadOnlySingleRowPkReplicaRequest message = 
readOnlySingleRowPkReplicaRequest(row, readTimestamp);
 
         return partitionReplicaListener.invoke(message, 
localNode.id()).thenApply(replicaResult -> (BinaryRow) replicaResult.result());
     }
@@ -2771,20 +2814,41 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<ReplicaResult> 
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
-        ReadOnlyMultiRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
+        ReadOnlyMultiRowPkReplicaRequest request = 
readOnlyMultiRowPkReplicaRequest(grpId, newTxId(), localNode.id(), rows, 
readTimestamp);
+
+        return invokeListener(request);
+    }
+
+    private static ReadOnlyMultiRowPkReplicaRequest 
readOnlyMultiRowPkReplicaRequest(
+            TablePartitionId grpId,
+            UUID txId,
+            UUID coordinatorId,
+            Collection<BinaryRow> rows,
+            HybridTimestamp readTimestamp
+    ) {
+        return TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .requestType(RO_GET_ALL)
                 .readTimestamp(readTimestamp)
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .primaryKeys(binaryRowsToBuffers(rows))
+                .transactionId(txId)
+                .coordinatorId(coordinatorId)
                 .build();
-
-        return partitionReplicaListener.invoke(request, localNode.id());
     }
 
     private CompletableFuture<ReplicaResult> 
doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
-        ReadOnlyDirectMultiRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
+        ReadOnlyDirectMultiRowReplicaRequest request = 
readOnlyDirectMultiRowReplicaRequest(grpId, rows);
+
+        return invokeListener(request);
+    }
+
+    private static ReadOnlyDirectMultiRowReplicaRequest 
readOnlyDirectMultiRowReplicaRequest(
+            TablePartitionId grpId,
+            Collection<BinaryRow> rows
+    ) {
+        return TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .tableId(TABLE_ID)
                 .requestType(RO_GET_ALL)
@@ -2792,8 +2856,6 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .primaryKeys(binaryRowsToBuffers(rows))
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
-
-        return partitionReplicaListener.invoke(request, localNode.id());
     }
 
     private void cleanup(UUID txId) {
@@ -3113,7 +3175,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .rowIds(List.of())
                 .build();
 
-        return partitionReplicaListener.invoke(request, localNode.id());
+        return invokeListener(request);
     }
 
     private void completeBuiltIndexes(IndexStorage... indexStorages) {
@@ -3161,4 +3223,107 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .txStateMeta(transactionMetaMessage)
                 .build();
     }
+
+    @ParameterizedTest
+    @EnumSource(NonDirectReadOnlyRequestFactory.class)
+    void 
nonDirectReadOnlyRequestsLockLwmAndDoNotUnlockIt(NonDirectReadOnlyRequestFactory
 requestFactory) {
+        RequestContext context = new RequestContext(grpId, newTxId(), clock, 
nextBinaryKey(), localNode.id());
+        ReadOnlyReplicaRequest request = requestFactory.create(context);
+
+        assertThat(invokeListener(request), willCompleteSuccessfully());
+
+        verify(lowWatermark).tryLock(any(), eq(request.readTimestamp()));
+        verify(lowWatermark, never()).unlock(any());
+    }
+
+    @Test
+    void directReadOnlySingleRowRequestDoesNotLockLwm() {
+        ReadOnlyDirectReplicaRequest request = 
readOnlyDirectSingleRowReplicaRequest(grpId, nextBinaryKey());
+
+        assertThat(invokeListener(request), willCompleteSuccessfully());
+
+        verify(lowWatermark, never()).tryLock(any(), any());
+    }
+
+    @Test
+    void directReadOnlyMultiRowRequestWithOneKeyDoesNotLockLwm() {
+        ReadOnlyDirectReplicaRequest request = 
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey()));
+
+        assertThat(invokeListener(request), willCompleteSuccessfully());
+
+        verify(lowWatermark, never()).tryLock(any(), any());
+    }
+
+    @Test
+    void directReadOnlyMultiRowRequestWithMultipleKeysLockAndUnlockLwm() {
+        ReadOnlyDirectReplicaRequest request = 
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey(), 
nextBinaryKey()));
+
+        assertThat(invokeListener(request), willCompleteSuccessfully());
+
+        InOrder orderVerifier = inOrder(lowWatermark, pkIndexStorage);
+
+        ArgumentCaptor<UUID> lockTxIdCaptor = 
ArgumentCaptor.forClass(UUID.class);
+        ArgumentCaptor<UUID> unlockTxIdCaptor = 
ArgumentCaptor.forClass(UUID.class);
+
+        orderVerifier.verify(lowWatermark).tryLock(lockTxIdCaptor.capture(), 
any());
+        orderVerifier.verify(pkIndexStorage).get(any());
+        orderVerifier.verify(lowWatermark).unlock(unlockTxIdCaptor.capture());
+
+        assertThat(unlockTxIdCaptor.getValue(), is(lockTxIdCaptor.getValue()));
+    }
+
+    @Test
+    void 
directReadOnlyMultiRowRequestWithMultipleKeysUnlockLwmEvenWhenExceptionHappens()
 {
+        doThrow(new RuntimeException("Oops")).when(pkIndexStorage).get(any());
+
+        ReadOnlyDirectReplicaRequest request = 
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey(), 
nextBinaryKey()));
+
+        assertThat(invokeListener(request), 
willThrowFast(RuntimeException.class, "Oops"));
+
+        verify(lowWatermark).unlock(any());
+    }
+
+    private static class RequestContext {
+        private final TablePartitionId groupId;
+        private final UUID txId;
+        private final HybridClock clock;
+        private final BinaryRow key;
+        private final UUID coordinatorId;
+
+        private RequestContext(TablePartitionId groupId, UUID txId, 
HybridClock clock, BinaryRow key, UUID coordinatorId) {
+            this.groupId = groupId;
+            this.txId = txId;
+            this.clock = clock;
+            this.key = key;
+            this.coordinatorId = coordinatorId;
+        }
+    }
+
+    private enum NonDirectReadOnlyRequestFactory {
+        SINGLE_GET(context -> readOnlySingleRowPkReplicaRequest(
+                context.groupId,
+                context.txId,
+                context.coordinatorId,
+                context.key,
+                context.clock.now())
+        ),
+        MULTI_GET(context -> readOnlyMultiRowPkReplicaRequest(
+                context.groupId,
+                context.txId,
+                context.coordinatorId,
+                singletonList(context.key),
+                context.clock.now()
+        )),
+        SCAN(context -> 
readOnlyScanRetrieveBatchReplicaRequest(context.groupId, context.txId, 
context.clock.now(), context.coordinatorId));
+
+        private final Function<RequestContext, ReadOnlyReplicaRequest> 
requestFactory;
+
+        NonDirectReadOnlyRequestFactory(Function<RequestContext, 
ReadOnlyReplicaRequest> requestFactory) {
+            this.requestFactory = requestFactory;
+        }
+
+        ReadOnlyReplicaRequest create(RequestContext context) {
+            return requestFactory.apply(context);
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 617bf30fa2..a893f75151 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -315,7 +316,8 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 clusterNodeResolver,
                 remotelyTriggeredResourceRegistry,
                 schemaRegistry,
-                indexMetaStorage
+                indexMetaStorage,
+                new TestLowWatermark()
         );
     }
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 6a80b56d74..1c052875c7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -518,7 +518,8 @@ public class ItTxTestCluster {
                     clusterService.topologyService(),
                     clusterService.messagingService(),
                     transactionInflights,
-                    txMgr
+                    txMgr,
+                    lowWatermark
             );
 
             assertThat(txMgr.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
@@ -868,7 +869,8 @@ public class ItTxTestCluster {
                 clusterNodeResolver,
                 resourcesRegistry,
                 schemaRegistry,
-                mock(IndexMetaStorage.class)
+                mock(IndexMetaStorage.class),
+                lowWatermark
         );
     }
 
@@ -1081,7 +1083,8 @@ public class ItTxTestCluster {
                 client.topologyService(),
                 client.messagingService(),
                 clientTransactionInflights,
-                clientTxManager
+                clientTxManager,
+                lowWatermark
         );
 
         clientTxStateResolver = new TransactionStateResolver(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 403d19dbcf..23f2ea84c9 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -461,7 +461,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(ClusterNodeResolver.class),
                 resourcesRegistry,
                 schemaManager,
-                mock(IndexMetaStorage.class)
+                mock(IndexMetaStorage.class),
+                new TestLowWatermark()
         );
 
         partitionListener = new PartitionListener(
diff --git a/modules/transactions/build.gradle 
b/modules/transactions/build.gradle
index a3b38079c4..8798cdeee0 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -56,6 +56,10 @@ dependencies {
     testImplementation libs.hamcrest.core
 
     integrationTestImplementation project(':ignite-api')
+    integrationTestImplementation project(':ignite-low-watermark')
+    integrationTestImplementation project(':ignite-schema')
+    integrationTestImplementation project(':ignite-configuration')
+    integrationTestImplementation project(':ignite-configuration-root')
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
     integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
new file mode 100644
index 0000000000..df078a35b6
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.readonly;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.TestWrappers.unwrapInternalTransaction;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import 
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+
+class ItReadOnlyTxAndLowWatermarkTest extends ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    // 100 keys to make sure that at least one key ends up on every of 2 nodes.
+    private static final int KEY_COUNT = 100;
+
+    private static final long SHORT_DATA_AVAILABILITY_TIME_MS = 1000;
+
+    @Override
+    protected int initialNodes() {
+        // 2 nodes to have a non-coordinator node in cluster.
+        return 2;
+    }
+
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        return new int[] {0};
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        builder.clusterConfiguration("ignite.gc.lowWatermark: {\n"
+                // Update frequently.
+                + "  updateInterval: 100\n"
+                + "}");
+    }
+
+    @BeforeEach
+    void createTable() {
+        node(0).sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT 
PRIMARY KEY, VAL VARCHAR)");
+    }
+
+    @ParameterizedTest
+    @EnumSource(TransactionalReader.class)
+    void 
roTransactionNoticesTupleVersionsMissingDueToGcOnDataNodes(TransactionalReader 
reader) throws Exception {
+        // TODO: remove the assumption when IGNITE-24119 is fixed.
+        assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+        updateDataAvailabilityTimeToShortPeriod();
+
+        Ignite coordinator = node(0);
+        KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+        insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+        Transaction roTx = coordinator.transactions().begin(new 
TransactionOptions().readOnly(true));
+
+        updateToNewValues(KEY_COUNT, kvView);
+
+        waitTillLwmTriesToRaiseAndEraseOverrittenVersions();
+
+        IgniteException ex = assertThrows(IgniteException.class, () -> 
reader.read(coordinator, roTx));
+        assertThat(ex, isA(reader.sql() ? SqlException.class : 
TransactionException.class));
+        assertThat(ex, hasToString(containsString("Read timestamp is not 
available anymore.")));
+        assertThat("Wrong error code: " + ex.codeAsString(), ex.code(), 
is(Transactions.TX_STALE_READ_ONLY_OPERATION_ERR));
+    }
+
+    private void updateDataAvailabilityTimeToShortPeriod() {
+        IgniteImpl igniteImpl = unwrapIgniteImpl(node(0));
+
+        LowWatermarkConfiguration lwmConfig = igniteImpl.clusterConfiguration()
+                .getConfiguration(GcExtensionConfiguration.KEY)
+                .gc()
+                .lowWatermark();
+
+        
assertThat(lwmConfig.dataAvailabilityTime().update(SHORT_DATA_AVAILABILITY_TIME_MS),
 willCompleteSuccessfully());
+    }
+
+    private static KeyValueView<Integer, String> kvView(Ignite coordinator) {
+        return 
coordinator.tables().table(TABLE_NAME).keyValueView(Integer.class, 
String.class);
+    }
+
+    private void insertOriginalValuesToBothNodes(int keyCount, 
KeyValueView<Integer, String> kvView) throws Exception {
+        PartitionManager partitionManager = 
node(0).tables().table(TABLE_NAME).partitionManager();
+        Set<String> primaryNames = new HashSet<>();
+
+        for (int i = 0; i < keyCount; i++) {
+            kvView.put(null, i, "original-" + i);
+
+            if (primaryNames.size() < 2) {
+                ClusterNode primaryReplica = primaryReplicaFor(i, 
partitionManager);
+                primaryNames.add(primaryReplica.name());
+            }
+        }
+
+        assertThat("Expecting both nodes to host inserted keys", primaryNames, 
hasSize(2));
+    }
+
+    private static ClusterNode primaryReplicaFor(int key, PartitionManager 
partitionManager) throws Exception {
+        CompletableFuture<ClusterNode> primaryReplicaFuture = 
partitionManager.partitionAsync(key, Mapper.of(Integer.class))
+                .thenCompose(partitionManager::primaryReplicaAsync);
+
+        return primaryReplicaFuture.get(10, SECONDS);
+    }
+
+    private static void updateToNewValues(int keyCount, KeyValueView<Integer, 
String> kvView) {
+        for (int i = 0; i < keyCount; i++) {
+            kvView.put(null, i, "updated-" + i);
+        }
+    }
+
+    private static void waitTillLwmTriesToRaiseAndEraseOverrittenVersions() 
throws InterruptedException {
+        Thread.sleep(2 * SHORT_DATA_AVAILABILITY_TIME_MS);
+    }
+
+    @CartesianTest
+    @WithSystemProperty(key = 
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = 
"100")
+    void lwmIsAllowedToBeRaisedOnDataNodesAfterRoTransactionFinish(
+            @Enum(TransactionalReader.class) TransactionalReader reader,
+            @Values(booleans = {true, false}) boolean commit
+    ) throws Exception {
+        // TODO: remove the assumption when IGNITE-24119 is fixed.
+        assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+        Ignite coordinator = node(0);
+        KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+        insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+        Transaction roTx = coordinator.transactions().begin(new 
TransactionOptions().readOnly(true));
+
+        reader.read(coordinator, roTx);
+
+        if (commit) {
+            roTx.commit();
+        } else {
+            roTx.rollback();
+        }
+
+        updateDataAvailabilityTimeToShortPeriod();
+
+        HybridTimestamp readTimestamp = 
unwrapInternalTransaction(roTx).readTimestamp();
+
+        assertLwmGrowsAbove(readTimestamp, node(0));
+        assertLwmGrowsAbove(readTimestamp, node(1));
+    }
+
+    private static void assertLwmGrowsAbove(HybridTimestamp ts, Ignite node) 
throws InterruptedException {
+        LowWatermarkImpl lowWatermark = unwrapIgniteImpl(node).lowWatermark();
+
+        assertTrue(
+                waitForCondition(
+                        () -> {
+                            HybridTimestamp lwm = 
lowWatermark.getLowWatermark();
+                            return lwm != null && lwm.compareTo(ts) > 0;
+                        },
+                        SECONDS.toMillis(10)
+                ),
+                "Did not see low watermark going up in time"
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TransactionalReader.class)
+    @WithSystemProperty(key = 
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = 
"100")
+    void 
nonFinishedRoTransactionsOfCoordinatorsThatLeftDontHoldLwm(TransactionalReader 
reader) throws Exception {
+        // TODO: remove the assumption when IGNITE-24119 is fixed.
+        assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+        Ignite coordinator = node(1);
+        KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+        insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+        Transaction roTx = coordinator.transactions().begin(new 
TransactionOptions().readOnly(true));
+
+        // Do actual read(s) in the transaction.
+        reader.read(coordinator, roTx);
+
+        // Stop the coordinator.
+        stopNode(1);
+
+        updateDataAvailabilityTimeToShortPeriod();
+
+        HybridTimestamp readTimestamp = 
unwrapInternalTransaction(roTx).readTimestamp();
+
+        assertLwmGrowsAbove(readTimestamp, node(0));
+    }
+
+    private enum TransactionalReader {
+        SINGLE_GETS {
+            @Override
+            void read(Ignite ignite, Transaction transaction) {
+                KeyValueView<Integer, String> kvView = kvView(ignite);
+                for (int i = 0; i < KEY_COUNT; i++) {
+                    kvView.get(transaction, i);
+                }
+            }
+
+            @Override
+            boolean sql() {
+                return false;
+            }
+        },
+        MULTI_GET {
+            @Override
+            void read(Ignite ignite, Transaction transaction) {
+                List<Integer> keys = IntStream.range(0, 
KEY_COUNT).boxed().collect(toList());
+                kvView(ignite).getAll(transaction, keys);
+            }
+
+            @Override
+            boolean sql() {
+                return false;
+            }
+        },
+        SELECT_ALL {
+            @Override
+            void read(Ignite ignite, Transaction transaction) {
+                try (ResultSet<SqlRow> resultSet = 
ignite.sql().execute(transaction, "SELECT * FROM " + TABLE_NAME)) {
+                    resultSet.forEachRemaining(item -> {});
+                }
+            }
+
+            @Override
+            boolean sql() {
+                return true;
+            }
+        },
+        SELECT_COUNT {
+            @Override
+            void read(Ignite ignite, Transaction transaction) {
+                try (ResultSet<SqlRow> resultSet = 
ignite.sql().execute(transaction, "SELECT COUNT(*) FROM " + TABLE_NAME)) {
+                    resultSet.forEachRemaining(item -> {});
+                }
+            }
+
+            @Override
+            boolean sql() {
+                return true;
+            }
+        };
+
+        abstract void read(Ignite ignite, Transaction transaction);
+
+        abstract boolean sql();
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index 1676be8d35..6e26562493 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -17,13 +17,15 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import java.util.UUID;
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 
 /**
- * Handles Cursor Cleanup request ({@link FinishedTransactionsBatchMessage}).
+ * Handles Transaction cleanup request ({@link 
FinishedTransactionsBatchMessage}).
  */
 public class FinishedTransactionBatchRequestHandler {
     /** Messaging service. */
@@ -32,6 +34,8 @@ public class FinishedTransactionBatchRequestHandler {
     /** Resources registry. */
     private final RemotelyTriggeredResourceRegistry resourcesRegistry;
 
+    private final LowWatermark lowWatermark;
+
     private final Executor asyncExecutor;
 
     /**
@@ -39,15 +43,18 @@ public class FinishedTransactionBatchRequestHandler {
      *
      * @param messagingService Messaging service.
      * @param resourcesRegistry Resources registry.
+     * @param lowWatermark Low watermark.
      * @param asyncExecutor Executor to run cleanup commands.
      */
     public FinishedTransactionBatchRequestHandler(
             MessagingService messagingService,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
+            LowWatermark lowWatermark,
             Executor asyncExecutor
     ) {
         this.messagingService = messagingService;
         this.resourcesRegistry = resourcesRegistry;
+        this.lowWatermark = lowWatermark;
         this.asyncExecutor = asyncExecutor;
     }
 
@@ -63,7 +70,13 @@ public class FinishedTransactionBatchRequestHandler {
     }
 
     private void 
processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage 
closeCursorsMessage) {
-        asyncExecutor.execute(() -> 
closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
+        asyncExecutor.execute(() -> 
closeCursorsMessage.transactions().forEach(this::cleanUpForTransaction));
+    }
+
+    private void cleanUpForTransaction(UUID transactionId) {
+        resourcesRegistry.close(transactionId);
+
+        lowWatermark.unlock(transactionId);
     }
 
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
index d8c5462371..823434b374 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
@@ -82,7 +82,7 @@ public class RemotelyTriggeredResourceRegistry {
 
                 resources.remove(resourceId);
 
-                
remoteRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId);
+                
removeRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId);
             } catch (Exception e) {
                 throw new ResourceCloseException(resourceId, 
remotelyTriggeredResource.remoteHostId(), e);
             }
@@ -131,7 +131,7 @@ public class RemotelyTriggeredResourceRegistry {
             for (FullyQualifiedResourceId resourceId : closedResources) {
                 resourcesWithContext.remove(resourceId);
 
-                remoteRemoteHostResource(remoteHostId, resourceId);
+                removeRemoteHostResource(remoteHostId, resourceId);
             }
         }
 
@@ -169,7 +169,7 @@ public class RemotelyTriggeredResourceRegistry {
         });
     }
 
-    private void remoteRemoteHostResource(UUID remoteHostId, 
FullyQualifiedResourceId resourceId) {
+    private void removeRemoteHostResource(UUID remoteHostId, 
FullyQualifiedResourceId resourceId) {
         remoteHostsToResources.computeIfPresent(remoteHostId, (k, v) -> {
             v.remove(resourceId);
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index 589b73147f..81f33bafef 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -84,6 +85,7 @@ public class ResourceVacuumManager implements IgniteComponent 
{
      * @param messagingService Messaging service.
      * @param transactionInflights Transaction inflights.
      * @param txManager Transactional manager.
+     * @param lowWatermark Low watermark.
      */
     public ResourceVacuumManager(
             String nodeName,
@@ -91,7 +93,8 @@ public class ResourceVacuumManager implements IgniteComponent 
{
             TopologyService topologyService,
             MessagingService messagingService,
             TransactionInflights transactionInflights,
-            TxManager txManager
+            TxManager txManager,
+            LowWatermark lowWatermark
     ) {
         this.resourceRegistry = resourceRegistry;
         this.clusterNodeResolver = topologyService;
@@ -104,8 +107,12 @@ public class ResourceVacuumManager implements 
IgniteComponent {
                 messagingService,
                 transactionInflights
         );
-        this.finishedTransactionBatchRequestHandler =
-                new FinishedTransactionBatchRequestHandler(messagingService, 
resourceRegistry, resourceVacuumExecutor);
+        this.finishedTransactionBatchRequestHandler = new 
FinishedTransactionBatchRequestHandler(
+                messagingService,
+                resourceRegistry,
+                lowWatermark,
+                resourceVacuumExecutor
+        );
 
         this.txManager = txManager;
     }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
new file mode 100644
index 0000000000..38359e4c6c
--- /dev/null
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FinishedTransactionBatchRequestHandlerTest extends 
BaseIgniteAbstractTest {
+    @Mock
+    private MessagingService messagingService;
+
+    @Mock
+    private RemotelyTriggeredResourceRegistry resourceRegistry;
+
+    @Mock
+    private LowWatermark lowWatermark;
+
+    private FinishedTransactionBatchRequestHandler requestHandler;
+
+    private NetworkMessageHandler networkHandler;
+
+    @BeforeEach
+    void createAndStartHandler() {
+        requestHandler = new FinishedTransactionBatchRequestHandler(
+                messagingService,
+                resourceRegistry,
+                lowWatermark,
+                ForkJoinPool.commonPool()
+        );
+        requestHandler.start();
+
+        ArgumentCaptor<NetworkMessageHandler> handlerCaptor = 
ArgumentCaptor.forClass(NetworkMessageHandler.class);
+        verify(messagingService).addMessageHandler(eq(TxMessageGroup.class), 
handlerCaptor.capture());
+
+        networkHandler = handlerCaptor.getValue();
+        assertThat(networkHandler, is(notNullValue()));
+    }
+
+    @Test
+    void unlocksLwm() {
+        UUID txId1 = new UUID(1, 1);
+        UUID txId2 = new UUID(2, 2);
+
+        FinishedTransactionsBatchMessage message = new 
TxMessagesFactory().finishedTransactionsBatchMessage()
+                .transactions(List.of(txId1, txId2))
+                .build();
+
+        networkHandler.onReceived(message, mock(ClusterNode.class), null);
+
+        verify(lowWatermark, timeout(10_000)).unlock(txId1);
+        verify(lowWatermark, timeout(10_000)).unlock(txId2);
+    }
+}


Reply via email to