This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 118a33b522 IGNITE-18527 Inline writeIntents into data storage on 
primary replica side (#1551)
118a33b522 is described below

commit 118a33b522ab90ea5ebb4aebe1da32f7b056d5e1
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Jan 26 14:56:34 2023 +0200

    IGNITE-18527 Inline writeIntents into data storage on primary replica side 
(#1551)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   2 +-
 .../app/ItIgniteInMemoryNodeRestartTest.java       |   6 +-
 .../storage/AbstractMvPartitionStorageTest.java    |  12 +
 .../ignite/distributed/ItTablePersistenceTest.java |  10 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  48 ++--
 ...butedTestThreeNodesThreeReplicasCollocated.java |  10 +-
 .../ignite/internal/table/ItColocationTest.java    |   8 +-
 .../table/distributed/StorageUpdateHandler.java    | 144 ++++++++++
 .../internal/table/distributed/TableManager.java   |  42 ++-
 .../table/distributed/raft/PartitionListener.java  |  94 ++-----
 .../replicator/PartitionReplicaListener.java       | 153 +++++++++--
 .../distributed/replicator/TablePartitionId.java   |   8 +-
 .../distributed/storage/InternalTableImpl.java     |   8 +-
 .../apache/ignite/internal/table/TxLocalTest.java  |   5 +
 .../raft/PartitionCommandListenerTest.java         |  32 ++-
 .../PartitionReplicaListenerIndexLockingTest.java  |   7 +
 .../replication/PartitionReplicaListenerTest.java  | 291 +++++++++++++++++++--
 .../ignite/internal/table/TxAbstractTest.java      |  13 +-
 .../table/impl/DummyInternalTableImpl.java         |  25 +-
 19 files changed, 728 insertions(+), 190 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d455731179..dcdb73e759 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -281,7 +281,7 @@ public class ErrorGroups {
         public static final int TX_ROLLBACK_ERR = 
TX_ERR_GROUP.registerErrorCode(8);
 
         /** Failed to enlist read-write operation into read-only transaction. 
*/
-        public static final int TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR = 
TX_ERR_GROUP.registerErrorCode(9);
+        public static final int TX_FAILED_READ_WRITE_OPERATION_ERR = 
TX_ERR_GROUP.registerErrorCode(9);
 
         /** The error happens when the replica is not ready to handle a 
request. */
         public static final int TX_REPLICA_UNAVAILABLE_ERR = 
TX_ERR_GROUP.registerErrorCode(10);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index fb32865c60..f690e07052 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -210,7 +210,7 @@ public class ItIgniteInMemoryNodeRestartTest extends 
IgniteAbstractTest {
         assertTrue(IgniteTestUtils.waitForCondition(
                 () -> loza.localNodes().stream().anyMatch(nodeId -> {
                     if (nodeId.groupId() instanceof TablePartitionId) {
-                        return ((TablePartitionId) 
nodeId.groupId()).getTableId().equals(tableId);
+                        return ((TablePartitionId) 
nodeId.groupId()).tableId().equals(tableId);
                     }
 
                     return true;
@@ -251,7 +251,7 @@ public class ItIgniteInMemoryNodeRestartTest extends 
IgniteAbstractTest {
         assertTrue(IgniteTestUtils.waitForCondition(
                 () -> loza.localNodes().stream().anyMatch(nodeId -> {
                     if (nodeId.groupId() instanceof TablePartitionId) {
-                        return ((TablePartitionId) 
nodeId.groupId()).getTableId().equals(tableId);
+                        return ((TablePartitionId) 
nodeId.groupId()).tableId().equals(tableId);
                     }
 
                     return true;
@@ -294,7 +294,7 @@ public class ItIgniteInMemoryNodeRestartTest extends 
IgniteAbstractTest {
             assertTrue(IgniteTestUtils.waitForCondition(
                     () -> loza.localNodes().stream().anyMatch(nodeId -> {
                         if (nodeId.groupId() instanceof TablePartitionId) {
-                            return ((TablePartitionId) 
nodeId.groupId()).getTableId().equals(tableId);
+                            return ((TablePartitionId) 
nodeId.groupId()).tableId().equals(tableId);
                         }
 
                         return true;
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 35ff2865b7..8d2c9e7d65 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -97,6 +97,18 @@ public abstract class AbstractMvPartitionStorageTest extends 
BaseMvPartitionStor
 
         // Read with timestamp returns write-intent.
         assertRowMatches(read(rowId, clock.now()), tableRow);
+
+        // Remove write.
+        addWrite(rowId, null, txId);
+
+        // Removed row can't be read.
+        assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
+
+        // Remove write once again.
+        addWrite(rowId, null, txId);
+
+        // Still can't be read.
+        assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d1f6067e07..ff896f324d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.tx.TxManager;
@@ -275,12 +277,14 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
                     var testMpPartStorage = new TestMvPartitionStorage(0);
 
+                    PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
+
+                    StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, Map::of);
+
                     PartitionListener listener = new PartitionListener(
                             new TestPartitionDataStorage(testMpPartStorage),
+                            storageUpdateHandler,
                             new TestTxStateStorage(),
-                            txManager,
-                            Map::of,
-                            0,
                             new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0))
                     );
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 10d188c446..280c021d30 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
@@ -76,8 +77,10 @@ import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TxAbstractTest;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -90,6 +93,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -146,6 +150,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
     protected Int2ObjectOpenHashMap<RaftGroupService> custRaftClients;
 
+    protected Map<String, TxStateStorage> txStateStorages;
+
     protected final List<ClusterService> cluster = new 
CopyOnWriteArrayList<>();
 
     private ScheduledThreadPoolExecutor executor;
@@ -250,6 +256,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
         replicaManagers = new HashMap<>(nodes);
         replicaServices = new HashMap<>(nodes);
         txManagers = new HashMap<>(nodes);
+        txStateStorages = new HashMap<>(nodes);
 
         executor = new ScheduledThreadPoolExecutor(20,
                 new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
@@ -296,6 +303,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
             txMgr.start();
 
             txManagers.put(node.name(), txMgr);
+
+            txStateStorages.put(node.name(), new TestTxStateStorage());
         }
 
         log.info("Raft servers have been started");
@@ -313,18 +322,16 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
         String localNodeName = 
accRaftClients.get(0).clusterService().topologyService().localMember().name();
 
-        TxManager txMgr;
-
         if (startClient()) {
-            txMgr = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), 
clientClock);
+            clientTxManager = new TxManagerImpl(clientReplicaSvc, new 
HeapLockManager(), clientClock);
         } else {
             // Collocated mode.
-            txMgr = txManagers.get(localNodeName);
+            clientTxManager = txManagers.get(localNodeName);
         }
 
-        assertNotNull(txMgr);
+        assertNotNull(clientTxManager);
 
-        igniteTransactions = new IgniteTransactionsImpl(txMgr);
+        igniteTransactions = new IgniteTransactionsImpl(clientTxManager);
 
         this.accounts = new TableImpl(new InternalTableImpl(
                 accountsName,
@@ -332,12 +339,12 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 accRaftClients,
                 1,
                 consistentIdToNode,
-                txMgr,
+                clientTxManager,
                 Mockito.mock(MvTableStorage.class),
                 Mockito.mock(TxStateTableStorage.class),
                 startClient() ? clientReplicaSvc : 
replicaServices.get(localNodeName),
                 startClient() ? clientClock : clocks.get(localNodeName)
-        ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), txMgr.lockManager());
+        ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), 
clientTxManager.lockManager());
 
         this.customers = new TableImpl(new InternalTableImpl(
                 customersName,
@@ -345,12 +352,12 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 custRaftClients,
                 1,
                 consistentIdToNode,
-                txMgr,
+                clientTxManager,
                 Mockito.mock(MvTableStorage.class),
                 Mockito.mock(TxStateTableStorage.class),
                 startClient() ? clientReplicaSvc : 
replicaServices.get(localNodeName),
                 startClient() ? clientClock : clocks.get(localNodeName)
-        ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), txMgr.lockManager());
+        ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), 
clientTxManager.lockManager());
 
         log.info("Tables have been started");
     }
@@ -388,7 +395,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
             for (String assignment : partAssignments) {
                 var testMpPartStorage = new TestMvPartitionStorage(0);
-                var txStateStorage = new TestTxStateStorage();
+                var txStateStorage = txStateStorages.get(assignment);
                 var placementDriver = new 
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
 
                 for (int part = 0; part < assignments.size(); part++) {
@@ -421,15 +428,17 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 PendingComparableValuesTracker<HybridTimestamp> safeTime =
                         new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
 
+                PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
+                Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () 
-> Map.of(pkStorage.get().id(), pkStorage.get());
+                StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(partId, partitionDataStorage, indexes);
+
                 CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(assignment).startRaftGroupNode(
                         new RaftNodeId(grpId, configuration.peer(assignment)),
                         configuration,
                         new PartitionListener(
-                                new 
TestPartitionDataStorage(testMpPartStorage),
-                                new TestTxStateStorage(),
-                                txManagers.get(assignment),
-                                () -> Map.of(pkStorage.get().id(), 
pkStorage.get()),
-                                partId,
+                                partitionDataStorage,
+                                storageUpdateHandler,
+                                txStateStorage,
                                 safeTime
                         ),
                         RaftGroupEventsListener.noopLsnr
@@ -454,6 +463,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                                 safeTime,
                                                 txStateStorage,
                                                 placementDriver,
+                                                storageUpdateHandler,
                                                 peer -> 
assignment.equals(peer.consistentId()),
                                                 
CompletableFuture.completedFuture(schemaManager)
                                         )
@@ -585,6 +595,12 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
         return network;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    protected TxManager clientTxManager() {
+        return clientTxManager;
+    }
+
     /** {@inheritDoc} */
     @Override
     protected TxManager txManager(Table t) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
index db77fd739d..1eced21290 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.UUID;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.junit.jupiter.api.BeforeEach;
@@ -66,11 +65,10 @@ public class 
ItTxDistributedTestThreeNodesThreeReplicasCollocated extends ItTxDi
         tx.commit();
 
         assertTrue(waitForCondition(
-                () -> txManagers.values().stream()
-                        .filter(txManager -> txManager.state(txId) != null && 
txManager.state(txId)
-                                .equals(TxState.COMMITED))
-                        .collect(Collectors.toList())
-                        .size() >= 2,
+                () -> txStateStorages.values().stream()
+                        .map(txStateStorage -> txStateStorage.get(txId))
+                        .filter(txMeta -> txMeta != null && txMeta.txState() 
== TxState.COMMITED)
+                        .count() >= 2,
                 5_000));
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index ec3ac28631..3b54c3bc77 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -196,8 +196,8 @@ public class ItColocationTest {
 
                 return r.run(MSG_FACTORY.updateAllCommand()
                                 
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
-                                        .tableId(commitPartId.getTableId())
-                                        .partitionId(commitPartId.getPartId())
+                                        .tableId(commitPartId.tableId())
+                                        
.partitionId(commitPartId.partitionId())
                                         .build()
                                 )
                             .rowsToUpdate(rows)
@@ -209,8 +209,8 @@ public class ItColocationTest {
                 return r.run(MSG_FACTORY.updateCommand()
                         .tablePartitionId(
                                 MSG_FACTORY.tablePartitionIdMessage()
-                                        .tableId(commitPartId.getTableId())
-                                        .partitionId(commitPartId.getPartId())
+                                        .tableId(commitPartId.tableId())
+                                        
.partitionId(commitPartId.partitionId())
                                         .build()
                         )
                         .rowUuid(Timestamp.nextVersion().toUuid())
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
new file mode 100644
index 0000000000..dae55ead81
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handler for storage updates that can be performed on processing of primary 
replica requests and partition replication requests.
+ */
+public class StorageUpdateHandler {
+    /** Partition id. */
+    private final int partitionId;
+
+    /** Partition storage with access to MV data of a partition. */
+    private final PartitionDataStorage storage;
+
+    private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+
+    /**
+     * The constructor.
+     *
+     * @param partitionId Partition id.
+     * @param storage Partition data storage.
+     * @param indexes Indexes supplier.
+     */
+    public StorageUpdateHandler(int partitionId, PartitionDataStorage storage, 
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes) {
+        this.partitionId = partitionId;
+        this.storage = storage;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Handle single update.
+     *
+     * @param txId Transaction id.
+     * @param rowUuid Row UUID.
+     * @param commitPartitionId Commit partition id.
+     * @param rowBuffer Row buffer.
+     * @param onReplication Callback on replication.
+     */
+    public void handleUpdate(
+            UUID txId,
+            UUID rowUuid,
+            TablePartitionId commitPartitionId,
+            ByteBuffer rowBuffer,
+            @Nullable Consumer<RowId> onReplication
+    ) {
+        storage.runConsistently(() -> {
+            TableRow row = rowBuffer != null ? new TableRow(rowBuffer) : null;
+            RowId rowId = new RowId(partitionId, rowUuid);
+            UUID commitTblId = commitPartitionId.tableId();
+            int commitPartId = commitPartitionId.partitionId();
+
+            storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+
+            if (onReplication != null) {
+                onReplication.accept(rowId);
+            }
+
+            addToIndexes(row, rowId);
+
+            return null;
+        });
+    }
+
+    /**
+     * Handle multiple updates.
+     *
+     * @param txId Transaction id.
+     * @param rowsToUpdate Collection of rows to update.
+     * @param commitPartitionId Commit partition id.
+     * @param onReplication On replication callback.
+     */
+    public void handleUpdateAll(
+            UUID txId,
+            Map<UUID, ByteBuffer> rowsToUpdate,
+            TablePartitionId commitPartitionId,
+            @Nullable Consumer<Collection<RowId>> onReplication
+    ) {
+        storage.runConsistently(() -> {
+            UUID commitTblId = commitPartitionId.tableId();
+            int commitPartId = commitPartitionId.partitionId();
+
+            if (!nullOrEmpty(rowsToUpdate)) {
+                List<RowId> rowIds = new ArrayList<>();
+
+                for (Map.Entry<UUID, ByteBuffer> entry : 
rowsToUpdate.entrySet()) {
+                    RowId rowId = new RowId(partitionId, entry.getKey());
+                    TableRow row = entry.getValue() != null ? new 
TableRow(entry.getValue()) : null;
+
+                    storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+
+                    rowIds.add(rowId);
+                    addToIndexes(row, rowId);
+                }
+
+                if (onReplication != null) {
+                    onReplication.accept(rowIds);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
+        if (tableRow == null) { // skip removes
+            return;
+        }
+
+        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+            index.put(tableRow, rowId);
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index bb02f0016a..7a379d8b3c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -699,11 +699,19 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
 
+                CompletableFuture<MvPartitionStorage> mvPartitionStorageFut = 
getOrCreateMvPartition(internalTbl.storage(), partId);
+
+                CompletableFuture<PartitionDataStorage> 
partitionDataStorageFut = mvPartitionStorageFut
+                        .thenApply(mvPartitionStorage -> 
partitionDataStorage(mvPartitionStorage, internalTbl, partId));
+
+                CompletableFuture<StorageUpdateHandler> 
storageUpdateHandlerFut = partitionDataStorageFut
+                        .thenApply(storage -> new StorageUpdateHandler(partId, 
storage, table.indexStorageAdapters(partId)));
+
                 CompletableFuture<Void> startGroupFut;
 
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (oldPartAssignment.isEmpty() && localMemberAssignment != 
null) {
-                    startGroupFut = 
getOrCreateMvPartition(internalTbl.storage(), 
partId).thenComposeAsync(mvPartitionStorage -> {
+                    startGroupFut = 
mvPartitionStorageFut.thenComposeAsync(mvPartitionStorage -> {
                         boolean hasData = 
mvPartitionStorage.lastAppliedIndex() > 0;
 
                         CompletableFuture<Boolean> fut;
@@ -742,7 +750,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 return completedFuture(null);
                             }
 
-                            return 
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId)
+                            return partitionDataStorageFut
+                                    .thenCompose(s -> storageUpdateHandlerFut)
+                                    .thenCompose(s -> 
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId))
                                     .thenAcceptAsync(txStatePartitionStorage 
-> {
                                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                                 internalTbl.storage(),
@@ -755,17 +765,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                                         var raftNodeId = new 
RaftNodeId(replicaGrpId, serverPeer);
 
+                                        PartitionDataStorage 
partitionDataStorage = partitionDataStorageFut.join();
+                                        StorageUpdateHandler 
storageUpdateHandler = storageUpdateHandlerFut.join();
+
                                         try {
                                             // TODO: use RaftManager 
interface, see https://issues.apache.org/jira/browse/IGNITE-18273
                                             ((Loza) 
raftMgr).startRaftGroupNode(
                                                     raftNodeId,
                                                     newConfiguration,
                                                     new PartitionListener(
-                                                            
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+                                                            
partitionDataStorage,
+                                                            
storageUpdateHandler,
                                                             
txStatePartitionStorage,
-                                                            txManager,
-                                                            
table.indexStorageAdapters(partId),
-                                                            partId,
                                                             safeTime
                                                     ),
                                                     new 
RebalanceRaftGroupEventsListener(
@@ -791,6 +802,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }
 
                 startGroupFut
+                        .thenCompose(v -> storageUpdateHandlerFut)
                         .thenComposeAsync(v -> {
                             try {
                                 return 
raftMgr.startRaftGroupService(replicaGrpId, newConfiguration);
@@ -805,13 +817,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 return completedFuture(null);
                             }
 
-                            CompletableFuture<MvPartitionStorage> 
partitionStorageFuture =
-                                    
getOrCreateMvPartition(internalTbl.storage(), partId);
-
                             CompletableFuture<TxStateStorage> 
txStateStorageFuture =
                                     
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
 
-                            return 
partitionStorageFuture.thenAcceptBoth(txStateStorageFuture, (partitionStorage, 
txStateStorage) -> {
+                            StorageUpdateHandler storageUpdateHandler = 
storageUpdateHandlerFut.join();
+
+                            return 
mvPartitionStorageFut.thenAcceptBoth(txStateStorageFuture, (partitionStorage, 
txStateStorage) -> {
                                 try {
                                     replicaMgr.startReplica(replicaGrpId,
                                             new PartitionReplicaListener(
@@ -829,6 +840,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                     safeTime,
                                                     txStateStorage,
                                                     placementDriver,
+                                                    storageUpdateHandler,
                                                     this::isLocalPeer,
                                                     
schemaManager.schemaRegistry(causalityToken, tblId)
                                             )
@@ -1831,6 +1843,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     if (shouldStartLocalServices) {
                         MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(internalTable.storage(), partId).join();
+                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+                        StorageUpdateHandler storageUpdateHandler =
+                                new StorageUpdateHandler(partId, 
partitionDataStorage, tbl.indexStorageAdapters(partId));
 
                         TxStateStorage txStatePartitionStorage = 
getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
 
@@ -1842,11 +1857,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         );
 
                         RaftGroupListener raftGrpLsnr = new PartitionListener(
-                                partitionDataStorage(mvPartitionStorage, 
internalTable, partId),
+                                partitionDataStorage,
+                                storageUpdateHandler,
                                 txStatePartitionStorage,
-                                txManager,
-                                tbl.indexStorageAdapters(partId),
-                                partId,
                                 safeTime
                         );
 
@@ -1891,6 +1904,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                             safeTime,
                                             txStatePartitionStorage,
                                             placementDriver,
+                                            storageUpdateHandler,
                                             TableManager.this::isLocalPeer,
                                             
completedFuture(schemaManager.schemaRegistry(tblId))
                                     )
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dc39737492..37fe4cc51b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,22 +19,18 @@ package org.apache.ignite.internal.table.distributed.raft;
 
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_UNEXPECTED_STATE_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -47,25 +43,22 @@ import 
org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
-import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
-import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -78,17 +71,12 @@ public class PartitionListener implements RaftGroupListener 
{
     /** Partition storage with access to MV data of a partition. */
     private final PartitionDataStorage storage;
 
+    /** Handler that processes storage updates. */
+    private final StorageUpdateHandler storageUpdateHandler;
+
     /** Storage of transaction metadata. */
     private final TxStateStorage txStateStorage;
 
-    /** Transaction manager. */
-    private final TxManager txManager;
-
-    private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
-
-    /** Partition ID. */
-    private final int partitionId;
-
     /** Rows that were inserted, updated or removed. */
     private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
 
@@ -99,23 +87,17 @@ public class PartitionListener implements RaftGroupListener 
{
      * The constructor.
      *
      * @param partitionDataStorage The storage.
-     * @param txManager Transaction manager.
-     * @param partitionId Partition ID this listener serves.
      * @param safeTime Safe time tracker.
      */
     public PartitionListener(
             PartitionDataStorage partitionDataStorage,
+            StorageUpdateHandler storageUpdateHandler,
             TxStateStorage txStateStorage,
-            TxManager txManager,
-            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
-            int partitionId,
             PendingComparableValuesTracker<HybridTimestamp> safeTime
     ) {
         this.storage = partitionDataStorage;
+        this.storageUpdateHandler = storageUpdateHandler;
         this.txStateStorage = txStateStorage;
-        this.txManager = txManager;
-        this.indexes = indexes;
-        this.partitionId = partitionId;
         this.safeTime = safeTime;
 
         // TODO: IGNITE-18502 Implement a pending update storage
@@ -214,24 +196,20 @@ public class PartitionListener implements 
RaftGroupListener {
             return;
         }
 
-        storage.runConsistently(() -> {
-            TableRow row = cmd.rowBuffer() != null ? new 
TableRow(cmd.rowBuffer()) : null;
-            UUID rowUuid = cmd.rowUuid();
-            RowId rowId = new RowId(partitionId, rowUuid);
-            UUID txId = cmd.txId();
-            UUID commitTblId = cmd.tablePartitionId().tableId();
-            int commitPartId = cmd.tablePartitionId().partitionId();
-
-            storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
-
-            txsPendingRowIds.computeIfAbsent(txId, entry -> new 
HashSet<>()).add(rowId);
+        TxMeta txMeta = txStateStorage.get(cmd.txId());
+        if (txMeta != null && (txMeta.txState() == COMMITED || 
txMeta.txState() == ABORTED)) {
+            storage.lastApplied(commandIndex, commandTerm);
 
-            addToIndexes(row, rowId);
+            return;
+        }
 
-            storage.lastApplied(commandIndex, commandTerm);
+        storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), 
cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
+                rowId -> {
+                    txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new 
HashSet<>()).add(rowId);
 
-            return null;
-        });
+                    storage.lastApplied(commandIndex, commandTerm);
+                }
+        );
     }
 
     /**
@@ -247,28 +225,19 @@ public class PartitionListener implements 
RaftGroupListener {
             return;
         }
 
-        storage.runConsistently(() -> {
-            UUID txId = cmd.txId();
-            Map<UUID, ByteBuffer> rowsToUpdate = cmd.rowsToUpdate();
-            UUID commitTblId = cmd.tablePartitionId().tableId();
-            int commitPartId = cmd.tablePartitionId().partitionId();
-
-            if (!nullOrEmpty(rowsToUpdate)) {
-                for (Map.Entry<UUID, ByteBuffer> entry : 
rowsToUpdate.entrySet()) {
-                    RowId rowId = new RowId(partitionId, entry.getKey());
-                    TableRow row = entry.getValue() != null ? new 
TableRow(entry.getValue()) : null;
-
-                    storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+        TxMeta txMeta = txStateStorage.get(cmd.txId());
+        if (txMeta != null && (txMeta.txState() == COMMITED || 
txMeta.txState() == ABORTED)) {
+            storage.lastApplied(commandIndex, commandTerm);
 
-                    txsPendingRowIds.computeIfAbsent(txId, entry0 -> new 
HashSet<>()).add(rowId);
+            return;
+        }
 
-                    addToIndexes(row, rowId);
-                }
+        storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), 
cmd.tablePartitionId().asTablePartitionId(), rowIds -> {
+            for (RowId rowId : rowIds) {
+                txsPendingRowIds.computeIfAbsent(cmd.txId(), entry0 -> new 
HashSet<>()).add(rowId);
             }
 
             storage.lastApplied(commandIndex, commandTerm);
-
-            return null;
         });
     }
 
@@ -357,9 +326,6 @@ public class PartitionListener implements RaftGroupListener 
{
 
             txsPendingRowIds.remove(txId);
 
-            // TODO: IGNITE-17638 TestOnly code, let's consider using Txn 
state map instead of states.
-            txManager.changeState(txId, null, cmd.commit() ? COMMITED : 
ABORTED);
-
             storage.lastApplied(commandIndex, commandTerm);
 
             return null;
@@ -458,16 +424,6 @@ public class PartitionListener implements 
RaftGroupListener {
         }
     }
 
-    private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
-        if (tableRow == null) { // skip removes
-            return;
-        }
-
-        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
-            index.put(tableRow, rowId);
-        }
-    }
-
     /**
      * Returns underlying storage.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index deb7211be4..9db426fca5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
@@ -79,6 +81,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
@@ -117,6 +120,7 @@ import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -158,6 +162,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Lock manager. */
     private final LockManager lockManager;
 
+    /** Handler that processes updates writing them to storage. */
+    private final StorageUpdateHandler storageUpdateHandler;
+
     /**
      * Cursors map. The key of the map is internal Ignite uuid which consists 
of a transaction id ({@link UUID}) and a cursor id
      * ({@link Long}).
@@ -192,6 +199,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private final Function<Peer, Boolean> isLocalPeerChecker;
 
+    private final ConcurrentMap<UUID, TxCleanupReadyFutureList> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
+
     private final CompletableFuture<SchemaRegistry> schemaFut;
 
     /**
@@ -210,6 +219,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param safeTime Safe time clock.
      * @param txStateStorage Transaction state storage.
      * @param placementDriver Placement driver.
+     * @param storageUpdateHandler Handler that processes updates writing them 
to storage.
      * @param isLocalPeerChecker Function for checking that the given peer is 
local.
      * @param schemaFut Table schema.
      */
@@ -228,6 +238,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             PendingComparableValuesTracker<HybridTimestamp> safeTime,
             TxStateStorage txStateStorage,
             PlacementDriver placementDriver,
+            StorageUpdateHandler storageUpdateHandler,
             Function<Peer, Boolean> isLocalPeerChecker,
             CompletableFuture<SchemaRegistry> schemaFut
     ) {
@@ -246,6 +257,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.txStateStorage = txStateStorage;
         this.placementDriver = placementDriver;
         this.isLocalPeerChecker = isLocalPeerChecker;
+        this.storageUpdateHandler = storageUpdateHandler;
         this.schemaFut = schemaFut;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -1040,18 +1052,44 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return failedFuture(e);
         }
 
-        HybridTimestampMessage timestampMsg = 
hybridTimestamp(request.commitTimestamp());
+        List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
 
-        TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
-                .txId(request.txId())
-                .commit(request.commit())
-                .commitTimestamp(timestampMsg)
-                .safeTime(hybridTimestamp(hybridClock.now()))
-                .build();
+        // TODO https://issues.apache.org/jira/browse/IGNITE-18617
+        // TODO https://issues.apache.org/jira/browse/IGNITE-18632
+        TxCleanupReadyFutureList futs = 
txCleanupReadyFutures.computeIfAbsent(request.txId(), k -> new 
TxCleanupReadyFutureList());
+
+        synchronized (futs) {
+            txUpdateFutures.addAll(futs.futures);
+
+            futs.futures.clear();
+
+            futs.finished = true;
+        }
+
+        if (txUpdateFutures.isEmpty()) {
+            releaseTxLocks(request.txId());
+
+            return completedFuture(null);
+        }
+
+        return allOf(txUpdateFutures.toArray(new 
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
+            HybridTimestampMessage timestampMsg = 
hybridTimestamp(request.commitTimestamp());
+
+            TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
+                    .txId(request.txId())
+                    .commit(request.commit())
+                    .commitTimestamp(timestampMsg)
+                    .safeTime(hybridTimestamp(hybridClock.now()))
+                    .build();
+
+            return raftClient
+                    .run(txCleanupCmd)
+                    .thenRun(() -> releaseTxLocks(request.txId()));
+        });
+    }
 
-        return raftClient
-                .run(txCleanupCmd)
-                .thenRun(() -> 
lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
+    private void releaseTxLocks(UUID txId) {
+        lockManager.locks(txId).forEachRemaining(lockManager::release);
     }
 
     /**
@@ -1218,7 +1256,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(result);
                     }
 
-                    return 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowIdsToDelete, txId))
+                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId))
                             .thenApply(ignored -> result);
                 });
             }
@@ -1254,7 +1292,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     CompletableFuture<Object> raftFut = 
rowIdsToDelete.isEmpty() ? completedFuture(null)
-                            : 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowIdsToDelete, txId));
+                            : 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, 
txId));
 
                     return raftFut.thenApply(ignored -> result);
                 });
@@ -1315,7 +1353,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 }
                                 return convertedMap;
                             })
-                            .thenCompose(convertedMap -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(convertedMap -> applyUpdateAllCommand(
                                     updateAllCommand(committedPartitionId, 
convertedMap, txId)))
                             .thenApply(ignored -> {
                                 // Release short term locks.
@@ -1361,7 +1399,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(null);
                     }
 
-                    return 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowsToUpdate, txId))
+                    return 
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate, 
txId))
                             .thenApply(ignored -> {
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> rowIdFut : rowIdFuts) {
@@ -1404,6 +1442,59 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
+    /**
+     * Executes an Update command.
+     *
+     * @param cmd Update command.
+     * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
+     */
+    private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
+        return applyUpdatingCommand(cmd.txId(), () -> {
+            storageUpdateHandler.handleUpdate(
+                    cmd.txId(),
+                    cmd.rowUuid(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    cmd.rowBuffer(),
+                    null
+            );
+
+            return applyCmdWithExceptionHandling(cmd);
+        });
+    }
+
+    /**
+     * Executes an UpdateAll command.
+     *
+     * @param cmd UpdateAll command.
+     * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
+     */
+    private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand 
cmd) {
+        return applyUpdatingCommand(cmd.txId(), () -> {
+            storageUpdateHandler.handleUpdateAll(cmd.txId(), 
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+
+            return applyCmdWithExceptionHandling(cmd);
+        });
+    }
+
+    private CompletableFuture<Object> applyUpdatingCommand(UUID txId, 
Supplier<CompletableFuture<Object>> closure) {
+        CompletableFuture<Object> applyCmdFuture;
+
+        TxCleanupReadyFutureList futs = 
txCleanupReadyFutures.computeIfAbsent(txId, k -> new 
TxCleanupReadyFutureList());
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-18632
+        synchronized (futs) {
+            if (futs.finished) {
+                throw new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished.");
+            }
+
+            applyCmdFuture = closure.get();
+
+            futs.futures.add(applyCmdFuture);
+        }
+
+        return applyCmdFuture;
+    }
+
     /**
      * Precesses single request.
      *
@@ -1436,7 +1527,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(ignored -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
                             .thenApply(ignored -> true);
                 });
@@ -1448,7 +1539,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(ignored -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
                             .thenApply(ignored -> row);
                 });
@@ -1465,7 +1556,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     return completedFuture(false);
                                 }
 
-                                return applyCmdWithExceptionHandling(
+                                return applyUpdateCommand(
                                         updateCommand(commitPartitionId, 
validatedRowId.uuid(), null, txId))
                                         .thenApply(ignored -> true);
                             });
@@ -1480,7 +1571,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     RowId rowId0 = new RowId(partId);
 
                     return toTableRow(searchRow).thenCompose(tableRow -> 
takeLocksForInsert(searchRow, rowId0, txId)
-                            .thenCompose(rowIdLock -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId0.uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
@@ -1502,7 +1593,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return toTableRow(searchRow).thenCompose(tableRow -> 
lockFut
-                            .thenCompose(rowIdLock -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId0.uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
@@ -1524,7 +1615,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return toTableRow(searchRow).thenCompose(tableRow -> 
lockFut
-                            .thenCompose(rowIdLock -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId0.uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
@@ -1542,7 +1633,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return toTableRow(searchRow).thenCompose(tableRow -> 
takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowIdLock -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId.uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> rowIdLock))
                             .thenApply(rowIdLock -> {
@@ -1560,7 +1651,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return toTableRow(searchRow).thenCompose(tableRow -> 
takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowLock -> 
applyCmdWithExceptionHandling(
+                            .thenCompose(rowLock -> applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
rowId.uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> rowLock))
                             .thenApply(rowIdLock -> {
@@ -1722,7 +1813,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 return completedFuture(false);
                             }
 
-                            return applyCmdWithExceptionHandling(
+                            return applyUpdateCommand(
                                     updateCommand(commitPartitionId, 
validatedRowId.get1().uuid(), tableRow.byteBuffer(), txId))
                                     .thenApply(ignored -> validatedRowId)
                                     .thenApply(rowIdLock -> {
@@ -1987,8 +2078,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private TablePartitionIdMessage tablePartitionId(TablePartitionId 
tablePartId) {
         return msgFactory.tablePartitionIdMessage()
-                .tableId(tablePartId.getTableId())
-                .partitionId(tablePartId.getPartId())
+                .tableId(tablePartId.tableId())
+                .partitionId(tablePartId.partitionId())
                 .build();
     }
 
@@ -2009,4 +2100,16 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<BinaryConverter> getConverterFuture(int 
schemaVersion) {
         return schemaFut.thenApply(schemaRegistry -> 
BinaryConverter.forRow(schemaRegistry.schema(schemaVersion)));
     }
+
+    /**
+     * Class that stores list of futures for updates that can block tx 
cleanup, and finished flag.
+     */
+    private static class TxCleanupReadyFutureList {
+        final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        /**
+         * Whether the transaction is finished and therefore locked for 
further updates by started cleanup process.
+         */
+        boolean finished;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
index 01aeaf3e56..cac005f024 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
@@ -44,20 +44,20 @@ public class TablePartitionId implements ReplicationGroupId 
{
     }
 
     /**
-     * Gets a pration id.
+     * Get the partition id.
      *
      * @return Partition id.
      */
-    public int getPartId() {
+    public int partitionId() {
         return partId;
     }
 
     /**
-     * Gets a table id.
+     * Get the table id.
      *
      * @return Table id.
      */
-    public UUID getTableId() {
+    public UUID tableId() {
         return tableId;
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index b0d1108257..44b4a300ee 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,7 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -208,7 +208,7 @@ public class InternalTableImpl implements InternalTable {
         if (tx != null && tx.isReadOnly()) {
             return failedFuture(
                     new TransactionException(
-                            TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+                            TX_FAILED_READ_WRITE_OPERATION_ERR,
                             "Failed to enlist read-write operation into 
read-only transaction txId={" + tx.id() + '}'
                     )
             );
@@ -271,7 +271,7 @@ public class InternalTableImpl implements InternalTable {
         if (tx != null && tx.isReadOnly()) {
             return failedFuture(
                     new TransactionException(
-                            TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+                            TX_FAILED_READ_WRITE_OPERATION_ERR,
                             "Failed to enlist read-write operation into 
read-only transaction txId={" + tx.id() + '}'
                     )
             );
@@ -933,7 +933,7 @@ public class InternalTableImpl implements InternalTable {
         if (tx != null && tx.isReadOnly()) {
             throw new TransactionException(
                     new TransactionException(
-                            TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+                            TX_FAILED_READ_WRITE_OPERATION_ERR,
                             "Failed to enlist read-write operation into 
read-only transaction txId={" + tx.id() + '}'
                     )
             );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index cefdb1bc27..dc57824eb4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -112,6 +112,11 @@ public class TxLocalTest extends TxAbstractTest {
         // TODO asch IGNITE-15928 implement local scan
     }
 
+    @Override
+    protected TxManager clientTxManager() {
+        return txManager;
+    }
+
     @Override
     protected TxManager txManager(Table t) {
         return txManager;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index ac35be91fb..f148e6a4d9 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
@@ -94,8 +96,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.internal.tx.impl.HeapLockManager;
-import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
@@ -200,12 +200,14 @@ public class PartitionCommandListenerTest {
 
         safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
 
+        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
+
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, indexes);
+
         commandListener = new PartitionListener(
                 partitionDataStorage,
+                storageUpdateHandler,
                 txStateStorage,
-                new TxManagerImpl(replicaService, new HeapLockManager(), 
hybridClock),
-                () -> Map.of(pkStorage.id(), pkStorage),
-                PARTITION_ID,
                 safeTimeTracker
         );
     }
@@ -290,12 +292,14 @@ public class PartitionCommandListenerTest {
 
         TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
+        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
+
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes);
+
         PartitionListener testCommandListener = new PartitionListener(
                 partitionDataStorage,
+                storageUpdateHandler,
                 txStateStorage,
-                new TxManagerImpl(replicaService, new HeapLockManager(), new 
HybridClockImpl()),
-                () -> Map.of(pkStorage.id(), pkStorage),
-                PARTITION_ID,
                 new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
         );
 
@@ -577,8 +581,8 @@ public class PartitionCommandListenerTest {
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
                         msgFactory.tablePartitionIdMessage()
-                                .tableId(commitPartId.getTableId())
-                                .partitionId(commitPartId.getPartId())
+                                .tableId(commitPartId.tableId())
+                                .partitionId(commitPartId.partitionId())
                                 .build())
                 .rowsToUpdate(rows)
                 .txId(txId)
@@ -613,8 +617,8 @@ public class PartitionCommandListenerTest {
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
                         msgFactory.tablePartitionIdMessage()
-                                .tableId(commitPartId.getTableId())
-                                .partitionId(commitPartId.getPartId())
+                                .tableId(commitPartId.tableId())
+                                .partitionId(commitPartId.partitionId())
                                 .build())
                 .rowsToUpdate(rows)
                 .txId(txId)
@@ -647,8 +651,8 @@ public class PartitionCommandListenerTest {
         invokeBatchedCommand(msgFactory.updateAllCommand()
                 .tablePartitionId(
                         msgFactory.tablePartitionIdMessage()
-                                .tableId(commitPartId.getTableId())
-                                .partitionId(commitPartId.getPartId())
+                                .tableId(commitPartId.tableId())
+                                .partitionId(commitPartId.partitionId())
                                 .build())
                 .rowsToUpdate(keyRows)
                 .txId(txId)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d3960dcd90..158be20534 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -60,6 +61,7 @@ import 
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -189,6 +191,11 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 new PendingComparableValuesTracker<>(CLOCK.now()),
                 new TestTxStateStorage(),
                 mock(PlacementDriver.class),
+                new StorageUpdateHandler(
+                        PART_ID,
+                        new 
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
+                        () -> Map.of(pkStorage.get().id(), pkStorage.get())
+                ),
                 peer -> true,
                 CompletableFuture.completedFuture(schemaManager)
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 6131815184..171fbc4816 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -17,7 +17,13 @@
 
 package org.apache.ignite.internal.table.distributed.replication;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,6 +33,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -35,8 +44,11 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -74,8 +86,11 @@ import 
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -93,12 +108,14 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -106,6 +123,8 @@ import org.mockito.Mock;
 
 /** There are tests for partition replica listener. */
 public class PartitionReplicaListenerTest extends IgniteAbstractTest {
+    private static final Supplier<CompletableFuture<?>> 
DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER = () -> completedFuture(null);
+
     /** Tx messages factory. */
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
@@ -138,6 +157,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private static PlacementDriver placementDriver = 
mock(PlacementDriver.class);
 
+    private static PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMvPartitionStorage);
+
     @Mock
     private static RaftGroupService mockRaftClient = 
mock(RaftGroupService.class);
 
@@ -176,16 +197,22 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     /** Secondary hash index. */
     private static TableSchemaAwareIndexStorage hashIndexStorage;
 
+    private static Supplier<CompletableFuture<?>> raftClientFutureSupplier = 
DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+
+    private static LockManager lockManager = new HeapLockManager();
+
     @BeforeAll
     private static void beforeAll() {
         
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock 
-> {
             if (!localLeader) {
-                return CompletableFuture.completedFuture(new 
LeaderWithTerm(new Peer(anotherNode.name()), 1L));
+                return completedFuture(new LeaderWithTerm(new 
Peer(anotherNode.name()), 1L));
             }
 
-            return CompletableFuture.completedFuture(new LeaderWithTerm(new 
Peer(localNode.name()), 1L));
+            return completedFuture(new LeaderWithTerm(new 
Peer(localNode.name()), 1L));
         });
 
+        when(mockRaftClient.run(any())).thenAnswer(invocationOnMock -> 
raftClientFutureSupplier.get());
+
         when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock 
-> {
             String consistentId = invocationOnMock.getArgument(0);
             if (consistentId.equals(anotherNode.name())) {
@@ -213,11 +240,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
                 txMeta = new TxMeta(TxState.ABORTED, 
Collections.singletonList(grpId), txFixedTimestamp);
             }
-            return CompletableFuture.completedFuture(txMeta);
+            return completedFuture(txMeta);
         });
 
         PendingComparableValuesTracker safeTimeClock = 
mock(PendingComparableValuesTracker.class);
-        
when(safeTimeClock.waitFor(any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
         UUID pkIndexId = UUID.randomUUID();
         UUID sortedIndexId = UUID.randomUUID();
@@ -248,14 +275,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 rowConverter::toTuple
         ));
 
-        sortedIndexStorage = new TableSchemaAwareIndexStorage(
-                sortedIndexId,
-                new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
-                        new SortedIndexColumnDescriptor("intVal", 
NativeTypes.INT32, false, true)
-                ))),
-                row -> null,
-                row -> null
-        );
+        SortedIndexStorage indexStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("intVal", NativeTypes.INT32, 
false, true)
+        )));
+
+        sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId, 
indexStorage, row -> null, row -> null);
 
         hashIndexStorage = new TableSchemaAwareIndexStorage(
                 hashIndexId,
@@ -266,10 +290,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 row -> null
         );
 
-        LockManager lockManager = new HeapLockManager();
-
         IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true, 
lockManager, row2tuple);
-        IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId, 
lockManager, null, row2tuple);
+        IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId, 
lockManager, indexStorage, row2tuple);
         IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false, 
lockManager, row2tuple);
 
         DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
@@ -288,8 +310,13 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 safeTimeClock,
                 txStateStorage,
                 placementDriver,
+                new StorageUpdateHandler(
+                        partId,
+                        partitionDataStorage,
+                        () -> Map.of(pkStorage.get().id(), pkStorage.get())
+                ),
                 peer -> localNode.name().equals(peer.consistentId()),
-                CompletableFuture.completedFuture(schemaManager)
+                completedFuture(schemaManager)
         );
 
         marshallerFactory = new ReflectionMarshallerFactory();
@@ -306,6 +333,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         ((TestHashIndexStorage) pkStorage.get().storage()).clear();
         ((TestHashIndexStorage) hashIndexStorage.storage()).clear();
         ((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
+        testMvPartitionStorage.clear();
     }
 
     @Test
@@ -761,6 +789,225 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertEquals(3, rows.size());
     }
 
+    @Test
+    public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+
+        doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+        checkRowInMvStorage(binaryRow(0), true);
+
+        BinaryRow br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v1"));
+        doSingleRowRequest(txId, br, RequestType.RW_UPSERT);
+        checkRowInMvStorage(br, true);
+
+        doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE);
+        checkRowInMvStorage(binaryRow(0), false);
+
+        doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+        checkRowInMvStorage(binaryRow(0), true);
+
+        br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v2"));
+        doSingleRowRequest(txId, br, RequestType.RW_GET_AND_REPLACE);
+        checkRowInMvStorage(br, true);
+
+        br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v3"));
+        doSingleRowRequest(txId, br, RequestType.RW_GET_AND_UPSERT);
+        checkRowInMvStorage(br, true);
+
+        doSingleRowRequest(txId, br, RequestType.RW_GET_AND_DELETE);
+        checkRowInMvStorage(br, false);
+
+        doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+        checkRowInMvStorage(binaryRow(0), true);
+        doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
+        checkRowInMvStorage(binaryRow(0), false);
+
+        lockManager.locks(txId).forEachRemaining(lock -> 
lockManager.release(lock));
+    }
+
+    @Test
+    public void testWriteIntentOnPrimaryReplicaMultiRowOps() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        BinaryRow row0 = binaryRow(0);
+        BinaryRow row1 = binaryRow(1);
+        Collection<BinaryRow> rows = asList(row0, row1);
+
+        doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
+        checkRowInMvStorage(row0, true);
+        checkRowInMvStorage(row1, true);
+
+        BinaryRow newRow0 = binaryRow(new TestKey(0, "k0"), new TestValue(2, 
"v2"));
+        BinaryRow newRow1 = binaryRow(new TestKey(1, "k1"), new TestValue(3, 
"v3"));
+        Collection<BinaryRow> newRows = asList(newRow0, newRow1);
+        doMultiRowRequest(txId, newRows, RequestType.RW_UPSERT_ALL);
+        checkRowInMvStorage(row0, false);
+        checkRowInMvStorage(row1, false);
+        checkRowInMvStorage(newRow0, true);
+        checkRowInMvStorage(newRow1, true);
+
+        doMultiRowRequest(txId, newRows, RequestType.RW_DELETE_ALL);
+        checkRowInMvStorage(row0, false);
+        checkRowInMvStorage(row1, false);
+        checkRowInMvStorage(newRow0, false);
+        checkRowInMvStorage(newRow1, false);
+
+        doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
+        checkRowInMvStorage(row0, true);
+        checkRowInMvStorage(row1, true);
+        doMultiRowRequest(txId, rows, RequestType.RW_DELETE_EXACT_ALL);
+        checkRowInMvStorage(row0, false);
+        checkRowInMvStorage(row1, false);
+
+        lockManager.locks(txId).forEachRemaining(lock -> 
lockManager.release(lock));
+    }
+
+    private void doSingleRowRequest(UUID txId, BinaryRow binaryRow, 
RequestType requestType) {
+        
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+                .transactionId(txId)
+                .requestType(requestType)
+                .binaryRow(binaryRow)
+                .term(1L)
+                .commitPartitionId(new TablePartitionId(UUID.randomUUID(), 
partId))
+                .build()
+        );
+    }
+
+    private void doMultiRowRequest(UUID txId, Collection<BinaryRow> 
binaryRows, RequestType requestType) {
+        
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+                .transactionId(txId)
+                .requestType(requestType)
+                .binaryRows(binaryRows)
+                .term(1L)
+                .commitPartitionId(new TablePartitionId(UUID.randomUUID(), 
partId))
+                .build()
+        );
+    }
+
+    @Test
+    public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        AtomicInteger counter = new AtomicInteger();
+
+        testWriteIntentOnPrimaryReplica(
+                txId,
+                () -> {
+                    BinaryRow binaryRow = binaryRow(counter.getAndIncrement());
+
+                    return 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+                            .transactionId(txId)
+                            .requestType(RequestType.RW_INSERT)
+                            .binaryRow(binaryRow)
+                            .term(1L)
+                            .commitPartitionId(new 
TablePartitionId(UUID.randomUUID(), partId))
+                            .build();
+                },
+                () -> checkRowInMvStorage(binaryRow(0), true)
+        );
+
+        lockManager.locks(txId).forEachRemaining(lock -> 
lockManager.release(lock));
+    }
+
+    @Test
+    public void testWriteIntentOnPrimaryReplicaUpdateAll() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        AtomicInteger counter = new AtomicInteger();
+
+        testWriteIntentOnPrimaryReplica(
+                txId,
+                () -> {
+                    int cntr = counter.getAndIncrement();
+                    BinaryRow binaryRow0 = binaryRow(cntr * 2);
+                    BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1);
+
+                    return 
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+                            .transactionId(txId)
+                            .requestType(RequestType.RW_UPSERT_ALL)
+                            .binaryRows(asList(binaryRow0, binaryRow1))
+                            .term(1L)
+                            .commitPartitionId(new 
TablePartitionId(UUID.randomUUID(), partId))
+                            .build();
+                },
+                () -> checkRowInMvStorage(binaryRow(0), true)
+        );
+
+        lockManager.locks(txId).forEachRemaining(lock -> 
lockManager.release(lock));
+    }
+
+    private void checkRowInMvStorage(BinaryRow binaryRow, boolean 
shouldBePresent) {
+        TableRow tableRow = tableRow(binaryRow);
+        Cursor<RowId> cursor = pkStorage.get().get(binaryRow);
+
+        if (shouldBePresent) {
+            boolean found = false;
+
+            // There can be write intents for deletion.
+            while (cursor.hasNext()) {
+                RowId rowId = cursor.next();
+
+                TableRow row = testMvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).tableRow();
+
+                if (row != null && Arrays.equals(tableRow.bytes(), 
row.bytes())) {
+                    found = true;
+                }
+            }
+
+            assertTrue(found);
+        } else {
+            RowId rowId = cursor.next();
+
+            TableRow row = testMvPartitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).tableRow();
+
+            assertTrue(row == null || !Arrays.equals(row.bytes(), 
tableRow.bytes()));
+        }
+    }
+
+    private void testWriteIntentOnPrimaryReplica(
+            UUID txId,
+            Supplier<ReadWriteReplicaRequest> updatingRequestSupplier,
+            Runnable checkAfterFirstOperation
+    ) {
+        partitionReplicaListener.invoke(updatingRequestSupplier.get());
+        checkAfterFirstOperation.run();
+
+        // Check that cleanup request processing awaits all write requests.
+        CompletableFuture<?> writeFut = new CompletableFuture<>();
+
+        raftClientFutureSupplier = () -> writeFut;
+
+        try {
+            CompletableFuture<?> replicaWriteFut = 
partitionReplicaListener.invoke(updatingRequestSupplier.get());
+
+            assertFalse(replicaWriteFut.isDone());
+
+            raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+
+            HybridTimestamp now = clock.now();
+
+            // Imitation of tx commit.
+            txStateStorage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), now));
+
+            CompletableFuture<?> replicaCleanupFut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+                    .txId(txId)
+                    .commit(true)
+                    .commitTimestamp(now)
+                    .term(1L)
+                    .build()
+            );
+
+            assertFalse(replicaCleanupFut.isDone());
+
+            writeFut.complete(null);
+
+            assertThat(replicaCleanupFut, willSucceedFast());
+        } finally {
+            raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+        }
+
+        // Check that one more write after cleanup is discarded.
+        CompletableFuture<?> writeAfterCleanupFuture = 
partitionReplicaListener.invoke(updatingRequestSupplier.get());
+        assertThat(writeAfterCleanupFuture, 
willFailFast(TransactionException.class));
+    }
+
     private static BinaryTuplePrefix toIndexBound(int val) {
         ByteBuffer tuple = new BinaryTuplePrefixBuilder(1, 
1).appendInt(val).build();
 
@@ -783,10 +1030,22 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         }
     }
 
+    protected static TableRow tableRow(BinaryRow binaryRow) {
+        return TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+    }
+
     private static TableRow tableRow(TestKey key, TestValue value) {
         return TableRowConverter.fromBinaryRow(binaryRow(key, value), 
rowConverter);
     }
 
+    protected static BinaryRow binaryRow(int i) {
+        try {
+            return kvMarshaller.marshal(new TestKey(i, "k" + i), new 
TestValue(i, "v" + i));
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     private static BinaryRow binaryRow(TestKey key, TestValue value) {
         try {
             return kvMarshaller.marshal(key, value);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5a356045f7..a4f0a18045 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -115,6 +115,8 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
     protected IgniteTransactions igniteTransactions;
 
+    protected TxManager clientTxManager;
+
     /**
      * Initialize the test state.
      */
@@ -290,7 +292,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(BALANCE_1 - DELTA, view.get(null, 
makeKey(1)).doubleValue("balance"));
         assertEquals(BALANCE_2 + DELTA, view.get(null, 
makeKey(2)).doubleValue("balance"));
 
-        assertEquals(5, txManager(accounts).finished());
+        assertEquals(5, clientTxManager().finished());
     }
 
     /**
@@ -314,7 +316,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
         assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null, 
makeKey(2)).doubleValue("balance"));
 
-        assertEquals(5, txManager(accounts).finished());
+        assertEquals(5, clientTxManager().finished());
     }
 
     /**
@@ -1642,6 +1644,13 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         return Tuple.create().set("name", name);
     }
 
+    /**
+     * Get a client tx manager.
+     *
+     * @return TX manager.
+     */
+    protected abstract TxManager clientTxManager();
+
     /**
      * Get a tx manager on a partition leader.
      *
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 6e82ef55e6..3346feb172 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -58,7 +59,9 @@ import 
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.distributed.HashIndexLocker;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -84,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
 public class DummyInternalTableImpl extends InternalTableImpl {
     public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1", 
2004);
 
+    private static final int PART_ID = 0;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
@@ -164,7 +169,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         super(
                 "test",
                 UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
+                Int2ObjectMaps.singleton(PART_ID, 
mock(RaftGroupService.class)),
                 1,
                 name -> mock(ClusterNode.class),
                 txManager == null ? new TxManagerImpl(replicaSvc, new 
HeapLockManager(), new HybridClockImpl()) : txManager,
@@ -175,7 +180,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         );
         RaftGroupService svc = partitionMap.get(0);
 
-        groupId = crossTableUsage ? new TablePartitionId(tableId(), 0) : 
crossTableGroupId;
+        groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : 
crossTableGroupId;
 
         lenient().doReturn(groupId).when(svc).groupId();
         Peer leaderPeer = new Peer(UUID.randomUUID().toString());
@@ -260,34 +265,36 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         HybridClock clock = new HybridClockImpl();
         PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
+        PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
+        Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.get().id(), pkStorage.get());
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes);
 
         DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schema);
 
         replicaListener = new PartitionReplicaListener(
                 mvPartStorage,
-                partitionMap.get(0),
+                partitionMap.get(PART_ID),
                 this.txManager,
                 this.txManager.lockManager(),
                 Runnable::run,
-                0,
+                PART_ID,
                 tableId,
                 () -> Map.of(pkLocker.id(), pkLocker),
                 pkStorage,
                 () -> Map.of(),
                 clock,
                 safeTime,
-                txStateStorage().getOrCreateTxStateStorage(0),
+                txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 placementDriver,
+                storageUpdateHandler,
                 peer -> true,
                 CompletableFuture.completedFuture(schemaManager)
         );
 
         partitionListener = new PartitionListener(
                 new TestPartitionDataStorage(mvPartStorage),
-                txStateStorage().getOrCreateTxStateStorage(0),
-                this.txManager,
-                () -> Map.of(pkStorage.get().id(), pkStorage.get()),
-                0,
+                storageUpdateHandler,
+                txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 safeTime
         );
     }

Reply via email to