This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1232ff1dd65 IGNITE-27319 Rework PartitionListener to be 
TablePartitionProcessor and not RaftGroupListener (#7218)
1232ff1dd65 is described below

commit 1232ff1dd651e90c27810f95cd150b21e8e1e421
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Dec 15 13:13:12 2025 +0200

    IGNITE-27319 Rework PartitionListener to be TablePartitionProcessor and not 
RaftGroupListener (#7218)
---
 .../ItZonePartitionRaftListenerRecoveryTest.java   |   4 +-
 .../PartitionReplicaLifecycleManager.java          |   4 +-
 .../raft/ZonePartitionRaftListenerTest.java        |  12 +-
 .../ItTxObservableTimePropagationTest.java         |   8 +-
 .../ReplicasSafeTimePropagationTest.java           |  36 ++-
 .../internal/table/distributed/TableManager.java   |   4 +-
 ...nListener.java => TablePartitionProcessor.java} |  89 +------
 .../distributed/raft/handlers/package-info.java    |   2 +-
 .../raft/PartitionCommandListenerTest.java         | 289 +++++----------------
 .../apache/ignite/distributed/ItTxTestCluster.java |   4 +-
 .../internal/table/TxInfrastructureTest.java       |   8 +-
 .../table/impl/DummyInternalTableImpl.java         |   4 +-
 12 files changed, 104 insertions(+), 360 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index 51516a437c7..8cf057178f0 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -110,7 +110,7 @@ import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -394,7 +394,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
         ClockService clockService = mock(ClockService.class);
         lenient().when(clockService.current()).thenReturn(clock.current());
 
-        return new PartitionListener(
+        return new TablePartitionProcessor(
                 txManager,
                 storage,
                 storageUpdateHandler,
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 06b6f5d5335..ad67418c33f 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -57,6 +57,8 @@ import static 
org.apache.ignite.internal.partitiondistribution.Assignments.assig
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
 import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
+import static 
org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_INDEX;
+import static 
org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_TERM;
 import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.COLLECTION_LIMIT;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -920,7 +922,7 @@ public class PartitionReplicaLifecycleManager extends
 
             if (haMode) {
                 ByteArray assignmentsChainKey = 
assignmentsChainKey(zonePartitionId);
-                byte[] assignmentChain = 
AssignmentsChain.of(newAssignments.get(i)).toBytes();
+                byte[] assignmentChain = AssignmentsChain.of(UNKNOWN_TERM, 
UNKNOWN_INDEX, newAssignments.get(i)).toBytes();
                 Operation chainOp = put(assignmentsChainKey, assignmentChain);
                 partitionAssignments.add(chainOp);
             }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 794221e0d15..8c6617ab460 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -80,7 +80,7 @@ import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -266,9 +266,9 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
         listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
 
-        PartitionListener partitionListener = partitionListener(TABLE_ID);
+        TablePartitionProcessor tablePartitionProcessor = 
partitionListener(TABLE_ID);
 
-        listener.addTableProcessor(TABLE_ID, partitionListener);
+        listener.addTableProcessor(TABLE_ID, tablePartitionProcessor);
 
         verify(mvPartitionStorage).lastApplied(2L, 3L);
         verify(mvPartitionStorage).committedGroupConfiguration(any());
@@ -410,7 +410,7 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     void testSkipWriteCommandByAppliedIndex() {
         mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
 
-        PartitionListener tableProcessor = partitionListener(TABLE_ID);
+        TablePartitionProcessor tableProcessor = partitionListener(TABLE_ID);
 
         listener.addTableProcessor(TABLE_ID, tableProcessor);
         // Update(All)Command handling requires both information about raft 
group topology and the primary replica,
@@ -552,14 +552,14 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         return commandClosure;
     }
 
-    private PartitionListener partitionListener(int tableId) {
+    private TablePartitionProcessor partitionListener(int tableId) {
         LeasePlacementDriver placementDriver = 
mock(LeasePlacementDriver.class);
         lenient().when(placementDriver.getCurrentPrimaryReplica(any(), 
any())).thenReturn(null);
 
         ClockService clockService = mock(ClockService.class);
         lenient().when(clockService.current()).thenReturn(clock.current());
 
-        return new PartitionListener(
+        return new TablePartitionProcessor(
                 txManager,
                 new SnapshotAwarePartitionDataStorage(
                         tableId,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
index 4de1eebe1be..b773d40dc08 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
@@ -39,7 +39,7 @@ import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingSta
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.TxInfrastructureTest;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -138,7 +138,7 @@ public class ItTxObservableTimePropagationTest extends 
TxInfrastructureTest {
                 }
 
                 var fsm = (JraftServerImpl.DelegatingStateMachine) 
raftNode.getOptions().getFsm();
-                PartitionListener listener = extractPartitionListener(fsm, 
accounts);
+                TablePartitionProcessor listener = 
extractPartitionListener(fsm, accounts);
                 PendingComparableValuesTracker<HybridTimestamp, Void> safeTime 
= listener.getSafeTimeTracker();
 
                 try {
@@ -172,7 +172,7 @@ public class ItTxObservableTimePropagationTest extends 
TxInfrastructureTest {
         assertTrue(commitTs2.compareTo(commitTs) > 0, "Invalid safe time");
     }
 
-    private static PartitionListener 
extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
-        return (PartitionListener) ((ZonePartitionRaftListener) 
fsm.getListener()).tableProcessor(table.tableId());
+    private static TablePartitionProcessor 
extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
+        return (TablePartitionProcessor) ((ZonePartitionRaftListener) 
fsm.getListener()).tableProcessor(table.tableId());
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 6146e5a1191..4b215b7f6aa 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -43,12 +43,12 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.TestHybridClock;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
 import org.apache.ignite.internal.configuration.ComponentWorkingDir;
 import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -59,7 +59,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
@@ -77,14 +78,11 @@ import 
org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
-import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -383,23 +381,23 @@ public class ReplicasSafeTimePropagationTest extends 
IgniteAbstractTest {
             ClockService clockService = mock(ClockService.class);
             when(clockService.current()).thenReturn(clock.current());
 
+            OutgoingSnapshotsManager outgoingSnapshotsManager = new 
OutgoingSnapshotsManager(
+                    clusterService.nodeName(),
+                    clusterService.messagingService(),
+                    new NoOpFailureManager()
+            );
+
             this.raftClient = raftManager.startRaftGroupNode(
                     new RaftNodeId(GROUP_ID, new Peer(nodeName)),
                     fromConsistentIds(cluster.keySet()),
-                    new PartitionListener(
+                    new ZonePartitionRaftListener(
+                            GROUP_ID,
+                            mock(TxStatePartitionStorage.class),
                             txManagerMock,
-                            mock(PartitionDataStorage.class),
-                            mock(StorageUpdateHandler.class),
                             safeTs,
-                            mock(CatalogService.class),
-                            mock(SchemaRegistry.class),
-                            mock(IndexMetaStorage.class),
-                            
clusterService.topologyService().localMember().id(),
-                            mock(MinimumRequiredTimeCollectorService.class),
-                            mock(Executor.class),
-                            placementDriver,
-                            clockService,
-                            GROUP_ID
+                            
mock(PendingIndependentComparableValuesTracker.class),
+                            outgoingSnapshotsManager,
+                            mock(Executor.class)
                     ) {
                         @Override
                         public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b6c672be1f0..928f430d891 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -166,7 +166,7 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
@@ -933,7 +933,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 raftClient
         );
 
-        var tablePartitionRaftListener = new PartitionListener(
+        var tablePartitionRaftListener = new TablePartitionProcessor(
                 txManager,
                 partitionDataStorage,
                 partitionUpdateHandlers.storageUpdateHandler,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
similarity index 86%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 10eb40d2cbb..c88fa1de9c6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -30,19 +30,14 @@ import static 
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAt
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 
-import java.nio.file.Path;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
@@ -55,15 +50,10 @@ import 
org.apache.ignite.internal.partition.replicator.raft.handlers.CommandHand
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
-import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.CommandClosure;
-import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -85,11 +75,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Partition command handler.
  */
-// TODO ignite-22522 Rename to TablePartitionProcessor and remove implements 
RaftGroupListener
-public class PartitionListener implements RaftGroupListener, 
RaftTableProcessor {
-    /** Logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(PartitionListener.class);
-
+public class TablePartitionProcessor implements RaftTableProcessor {
     /** Transaction manager. */
     private final TxManager txManager;
 
@@ -123,7 +109,7 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
     private ReplicaMeta lastKnownLease;
 
     /** Constructor. */
-    public PartitionListener(
+    public TablePartitionProcessor(
             TxManager txManager,
             PartitionDataStorage partitionDataStorage,
             StorageUpdateHandler storageUpdateHandler,
@@ -195,66 +181,6 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         }
     }
 
-    @Override
-    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
-        iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) 
-> {
-            Command command = clo.command();
-
-            assert false : "No read commands expected, [cmd=" + command + ']';
-        });
-    }
-
-    @Override
-    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
-        iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) 
-> {
-            WriteCommand command = clo.command();
-
-            long commandIndex = clo.index();
-            long commandTerm = clo.term();
-            @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
-            assert safeTimestamp == null || command instanceof 
SafeTimePropagatingCommand : command;
-
-            long storagesAppliedIndex = storage.lastAppliedIndex();
-
-            assert commandIndex > storagesAppliedIndex :
-                    "Write command must have an index greater than that of 
storages [commandIndex=" + commandIndex
-                            + ", mvAppliedIndex=" + storage.lastAppliedIndex() 
+ "]";
-
-            CommandResult result;
-
-            // NB: Make sure that ANY command we accept here updates 
lastAppliedIndex+term info in one of the underlying
-            // storages!
-            // Otherwise, a gap between lastAppliedIndex from the point of 
view of JRaft and our storage might appear.
-            // If a leader has such a gap, and does doSnapshot(), it will 
subsequently truncate its log too aggressively
-            // in comparison with 'snapshot' state stored in our storages; and 
if we install a snapshot from our storages
-            // to a follower at this point, for a subsequent AppendEntries the 
leader will not be able to get prevLogTerm
-            // (because it's already truncated in the leader's log), so it 
will have to install a snapshot again, and then
-            // repeat same thing over and over again.
-
-            storage.acquirePartitionSnapshotsReadLock();
-
-            try {
-                result = processCommand(command, commandIndex, commandTerm, 
safeTimestamp);
-            } catch (Throwable t) {
-                LOG.error(
-                        "Got error while processing command [commandIndex={}, 
commandTerm={}, command={}]",
-                        t,
-                        clo.index(), clo.index(), command
-                );
-
-                clo.result(t);
-
-                throw t;
-            } finally {
-                storage.releasePartitionSnapshotsReadLock();
-            }
-
-            // Completing the closure out of the partition snapshots lock to 
reduce possibility of deadlocks as it might
-            // trigger other actions taking same locks.
-            clo.result(result.result());
-        });
-    }
-
     @Override
     public CommandResult processCommand(
             WriteCommand command,
@@ -567,17 +493,6 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         currentGroupTopology.addAll(config.learners());
     }
 
-    @Override
-    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        throw new UnsupportedOperationException("!!! It's not expected that 
PartitionListener onSnapshotSave will be called.");
-
-    }
-
-    @Override
-    public boolean onSnapshotLoad(Path path) {
-        return true;
-    }
-
     @Override
     public void onShutdown() {
         storage.close();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
index 73bb636fa4c..f5336c82260 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
@@ -17,7 +17,7 @@
 
 /**
  * This package contains RAFT command handlers that is used by
- * {@link org.apache.ignite.internal.table.distributed.raft.PartitionListener} 
aka table raft processor.
+ * {@link 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor} aka 
table raft processor.
  */
 
 package org.apache.ignite.internal.table.distributed.raft.handlers;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 2403a7d2f1f..d64bef61ea6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -22,11 +22,9 @@ import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUuidFrom;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -34,7 +32,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
@@ -45,22 +42,18 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -81,15 +74,14 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
-import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
@@ -131,9 +123,6 @@ import 
org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.UpdateCommandResult;
-import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
-import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Cursor;
@@ -143,8 +132,6 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -173,7 +160,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private static final SchemaRegistry SCHEMA_REGISTRY = new 
DummySchemaManagerImpl(SCHEMA);
 
-    private PartitionListener commandListener;
+    private TablePartitionProcessor commandListener;
 
     private final AtomicLong raftIndex = new AtomicLong();
 
@@ -196,8 +183,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private final PartitionDataStorage partitionDataStorage = spy(new 
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
 
-    private final TxStatePartitionStorage txStatePartitionStorage = spy(new 
TestTxStatePartitionStorage());
-
     @WorkDirectory
     private Path workDir;
 
@@ -210,12 +195,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private SafeTimeValuesTracker safeTimeTracker;
 
-    @Captor
-    private ArgumentCaptor<Throwable> commandClosureResultCaptor;
-
-    @Captor
-    private ArgumentCaptor<UpdateCommandResult> 
updateCommandClosureResultCaptor;
-
     private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
 
     private IndexUpdateHandler indexUpdateHandler;
@@ -295,7 +274,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         ClockService clockService = mock(ClockService.class);
         
lenient().when(clockService.current()).thenReturn(hybridClock.current());
 
-        commandListener = new PartitionListener(
+        commandListener = new TablePartitionProcessor(
                 mock(TxManager.class),
                 partitionDataStorage,
                 storageUpdateHandler,
@@ -336,7 +315,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                     
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
                     .build();
 
-            
commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(),
 1, command, null, null)).iterator());
+            commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, null);
         }
     }
 
@@ -410,44 +389,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         readAndCheck(false);
     }
 
-    private CommandClosure<WriteCommand> writeCommandCommandClosure(
-            long index,
-            long term,
-            WriteCommand writeCommand
-    ) {
-        return writeCommandCommandClosure(index, term, writeCommand, null, 
hybridClock.now());
-    }
-
-    /**
-     * Create a command closure.
-     *
-     * @param index Index of the RAFT command.
-     * @param term Term of RAFT command.
-     * @param writeCommand Write command.
-     * @param resultClosureCaptor Captor for {@link 
CommandClosure#result(Serializable)}
-     * @param safeTimestamp The safe timestamp.
-     */
-    private static CommandClosure<WriteCommand> writeCommandCommandClosure(
-            long index,
-            long term,
-            WriteCommand writeCommand,
-            @Nullable ArgumentCaptor<? extends Serializable> 
resultClosureCaptor,
-            @Nullable HybridTimestamp safeTimestamp
-    ) {
-        CommandClosure<WriteCommand> commandClosure = 
mock(CommandClosure.class);
-
-        when(commandClosure.index()).thenReturn(index);
-        when(commandClosure.term()).thenReturn(term);
-        when(commandClosure.command()).thenReturn(writeCommand);
-        when(commandClosure.safeTimestamp()).thenReturn(safeTimestamp);
-
-        if (resultClosureCaptor != null) {
-            
doNothing().when(commandClosure).result(resultClosureCaptor.capture());
-        }
-
-        return commandClosure;
-    }
-
     @Test
     void updatesLastAppliedForUpdateCommands() {
         safeTimeTracker.update(hybridClock.now(), null);
@@ -461,9 +402,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(hybridClock.now())
                 .build();
 
-        commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, command)
-        ).iterator());
+        commandListener.processCommand(command, 3, 2, hybridClock.now());
 
         verify(mvPartitionStorage).lastApplied(3, 2);
     }
@@ -484,9 +423,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .initiatorTime(hybridClock.now())
                 .build();
 
-        commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, command)
-        ).iterator());
+        commandListener.processCommand(command, 3, 2, hybridClock.now());
 
         verify(mvPartitionStorage).lastApplied(3, 2);
     }
@@ -497,15 +434,12 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .safeTimeSyncCommand()
                 .initiatorTime(hybridClock.now());
 
-        commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), 
commandClosureResultCaptor, hybridClock.now())
-        ).iterator());
+        commandListener.processCommand(safeTimeSyncCommand.build(), 3, 2, 
hybridClock.now());
 
         InOrder inOrder = inOrder(partitionDataStorage);
 
         
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
         inOrder.verify(partitionDataStorage).lastApplied(3, 2);
-        
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
     }
 
     @Test
@@ -697,8 +631,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void applySafeTimeCommand(Class<? extends 
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
         SafeTimePropagatingCommand command = mock(cls);
 
-        CommandClosure<WriteCommand> closure = writeCommandCommandClosure(3, 
1, command, commandClosureResultCaptor, timestamp);
-        commandListener.onWrite(asList(closure).iterator());
+        commandListener.processCommand(command, 3, 1, timestamp);
         assertEquals(timestamp, safeTimeTracker.current());
     }
 
@@ -713,9 +646,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .safeTime(hybridClock.now())
                 .build();
 
-        commandListener.onWrite(List.of(
-                writeCommandCommandClosure(raftIndex.incrementAndGet(), 2, 
command)
-        ).iterator());
+        commandListener.processCommand(command, raftIndex.incrementAndGet(), 
2, hybridClock.now());
 
         verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
     }
@@ -730,74 +661,11 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .safeTime(hybridClock.now())
                 .build();
 
-        commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 2, safeTimeSyncCommand)
-        ).iterator());
+        commandListener.processCommand(safeTimeSyncCommand, 3, 2, 
hybridClock.now());
 
         verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
-    /**
-     * Prepares a closure iterator for a specific batch operation.
-     *
-     * @param func The function prepare a closure for the operation.
-     * @param <T> Type of the operation.
-     * @return Closure iterator.
-     */
-    private <T extends Command> Iterator<CommandClosure<T>> 
batchIterator(Consumer<CommandClosure<T>> func) {
-        return new Iterator<>() {
-            boolean moved;
-
-            @Override
-            public boolean hasNext() {
-                return !moved;
-            }
-
-            @Override
-            public CommandClosure<T> next() {
-                CommandClosure<T> clo = mock(CommandClosure.class);
-
-                func.accept(clo);
-
-                moved = true;
-
-                return clo;
-            }
-        };
-    }
-
-    /**
-     * Prepares a closure iterator for a specific operation.
-     *
-     * @param func The function prepare a closure for the operation.
-     * @param <T> Type of the operation.
-     * @return Closure iterator.
-     */
-    private <T extends Command> Iterator<CommandClosure<T>> 
iterator(BiConsumer<Integer, CommandClosure<T>> func) {
-        return new Iterator<>() {
-            /** Iteration. */
-            private int it = 0;
-
-            /** {@inheritDoc} */
-            @Override
-            public boolean hasNext() {
-                return it < KEY_COUNT;
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public CommandClosure<T> next() {
-                CommandClosure<T> clo = mock(CommandClosure.class);
-
-                func.accept(it, clo);
-
-                it++;
-
-                return clo;
-            }
-        };
-    }
-
     /**
      * Inserts all rows.
      */
@@ -841,7 +709,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     /**
      * Update values from the listener in the batch operation.
      *
-     * @param keyValueMapper Mep a value to update to the iter number.
+     * @param keyValueMapper Map a value to update to the iter number.
      */
     private void updateAll(Function<Integer, Integer> keyValueMapper) {
         UUID txId = TestTransactionIds.newTransactionId();
@@ -922,41 +790,33 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     /**
      * Update rows.
      *
-     * @param keyValueMapper Mep a value to update to the iter number.
+     * @param keyValueMapper Map a value to update to the iter number.
      */
     private void update(Function<Integer, Integer> keyValueMapper) {
         List<UUID> txIds = new ArrayList<>();
 
-        commandListener.onWrite(iterator((i, clo) -> {
+        for (int i = 0; i < KEY_COUNT; i++) {
             UUID txId = TestTransactionIds.newTransactionId();
             BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
             ReadResult readResult = readRow(getTestKey(i));
 
             txIds.add(txId);
 
-            when(clo.safeTimestamp()).thenReturn(hybridClock.now());
-            when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
-            when(clo.command()).thenReturn(
-                    PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
-                            .tableId(TABLE_ID)
-                            .commitPartitionId(defaultPartitionIdMessage())
-                            .rowUuid(readResult.rowId().uuid())
-                            
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
-                                    .binaryRowMessage(row)
-                                    .build())
-                            .txId(txId)
-                            .initiatorTime(hybridClock.now())
-                            .safeTime(hybridClock.now())
-                            .txCoordinatorId(UUID.randomUUID())
-                            .build());
-
-            doAnswer(invocation -> {
-                assertTrue(invocation.getArgument(0) instanceof 
UpdateCommandResult);
-
-                return null;
-            }).when(clo).result(any());
-        }));
+            UpdateCommandV2 command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+                    .tableId(TABLE_ID)
+                    .commitPartitionId(defaultPartitionIdMessage())
+                    .rowUuid(readResult.rowId().uuid())
+                    
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
+                            .binaryRowMessage(row)
+                            .build())
+                    .txId(txId)
+                    .initiatorTime(hybridClock.now())
+                    .safeTime(hybridClock.now())
+                    .txCoordinatorId(UUID.randomUUID())
+                    .build();
+
+            commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
+        }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
 
@@ -983,32 +843,24 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void delete() {
         List<UUID> txIds = new ArrayList<>();
 
-        commandListener.onWrite(iterator((i, clo) -> {
+        for (int i = 0; i < KEY_COUNT; i++) {
             UUID txId = TestTransactionIds.newTransactionId();
             ReadResult readResult = readRow(getTestKey(i));
 
             txIds.add(txId);
 
-            when(clo.safeTimestamp()).thenReturn(hybridClock.now());
-            when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
-            when(clo.command()).thenReturn(
-                    PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
-                            .tableId(TABLE_ID)
-                            .commitPartitionId(defaultPartitionIdMessage())
-                            .rowUuid(readResult.rowId().uuid())
-                            .txId(txId)
-                            .initiatorTime(hybridClock.now())
-                            .safeTime(hybridClock.now())
-                            .txCoordinatorId(UUID.randomUUID())
-                            .build());
-
-            doAnswer(invocation -> {
-                assertTrue(invocation.getArgument(0) instanceof 
UpdateCommandResult);
+            UpdateCommandV2 command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+                    .tableId(TABLE_ID)
+                    .commitPartitionId(defaultPartitionIdMessage())
+                    .rowUuid(readResult.rowId().uuid())
+                    .txId(txId)
+                    .initiatorTime(hybridClock.now())
+                    .safeTime(hybridClock.now())
+                    .txCoordinatorId(UUID.randomUUID())
+                    .build();
 
-                return null;
-            }).when(clo).result(any());
-        }));
+            commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
+        }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
 
@@ -1061,33 +913,25 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void insert() {
         List<UUID> txIds = new ArrayList<>();
 
-        commandListener.onWrite(iterator((i, clo) -> {
-            UUID txId = TestTransactionIds.newTransactionId();
-            txIds.add(txId);
+        for (int i = 0; i < KEY_COUNT; i++) {
+            UUID txId0 = TestTransactionIds.newTransactionId();
+            txIds.add(txId0);
+
+            UpdateCommandV2 command = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+                    .tableId(TABLE_ID)
+                    .commitPartitionId(defaultPartitionIdMessage())
+                    .rowUuid(UUID.randomUUID())
+                    
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
+                            .binaryRowMessage(getTestRow(i, i))
+                            .build())
+                    .txId(txId0)
+                    .initiatorTime(hybridClock.now())
+                    .safeTime(hybridClock.now())
+                    .txCoordinatorId(UUID.randomUUID())
+                    .build();
 
-            when(clo.safeTimestamp()).thenReturn(hybridClock.now());
-            when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
-            when(clo.command()).thenReturn(
-                    PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
-                            .tableId(TABLE_ID)
-                            .commitPartitionId(defaultPartitionIdMessage())
-                            .rowUuid(UUID.randomUUID())
-                            
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
-                                    .binaryRowMessage(getTestRow(i, i))
-                                    .build())
-                            .txId(txId)
-                            .initiatorTime(hybridClock.now())
-                            .safeTime(hybridClock.now())
-                            .txCoordinatorId(UUID.randomUUID())
-                            .build());
-
-            doAnswer(invocation -> {
-                assertTrue(invocation.getArgument(0) instanceof 
UpdateCommandResult);
-
-                return null;
-            }).when(clo).result(any());
-        }));
+            commandListener.processCommand(command, 
raftIndex.incrementAndGet(), 1, hybridClock.now());
+        }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
 
@@ -1135,22 +979,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     }
 
     private void invokeBatchedCommand(WriteCommand cmd) {
-        commandListener.onWrite(batchIterator(clo -> {
-            when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
-            doAnswer(invocation -> {
-                if (cmd instanceof UpdateCommand || cmd instanceof 
UpdateAllCommand) {
-                    assertTrue(invocation.getArgument(0) instanceof 
UpdateCommandResult);
-                } else {
-                    assertNull(invocation.getArgument(0));
-                }
-
-                return null;
-            }).when(clo).result(any());
-
-            when(clo.safeTimestamp()).thenReturn(hybridClock.now());
-            when(clo.command()).thenReturn(cmd);
-        }));
+        commandListener.processCommand(cmd, raftIndex.incrementAndGet(), 1, 
hybridClock.now());
     }
 
     private @Nullable ReadResult readRow(BinaryTuple pk) {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 41875bd2bfe..2ec4dea377d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -156,7 +156,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -891,7 +891,7 @@ public class ItTxTestCluster {
                 )
         );
 
-        PartitionListener tablePartitionRaftListener = new PartitionListener(
+        TablePartitionProcessor tablePartitionRaftListener = new 
TablePartitionProcessor(
                 txManagers.get(assignment),
                 partitionDataStorage,
                 storageUpdateHandler,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
index e2411867a9e..9a4cb38b1f0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
@@ -56,7 +56,7 @@ import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -299,11 +299,11 @@ public abstract class TxInfrastructureTest extends 
IgniteAbstractTest {
 
             var fsm = (JraftServerImpl.DelegatingStateMachine) 
grp.getRaftNode().getOptions().getFsm();
 
-            PartitionListener listener;
+            TablePartitionProcessor listener;
             if (colocationEnabled()) {
-                listener = (PartitionListener) ((ZonePartitionRaftListener) 
fsm.getListener()).tableProcessor(table.tableId());
+                listener = (TablePartitionProcessor) 
((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(table.tableId());
             } else {
-                listener = (PartitionListener) fsm.getListener();
+                listener = (TablePartitionProcessor) fsm.getListener();
             }
 
             MvPartitionStorage storage = listener.getMvStorage();
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0d9d6d4872c..0caf6b7f99d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -123,7 +123,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -523,7 +523,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         lenient().when(clockService.current()).thenReturn(clock.current());
 
         PendingComparableValuesTracker<Long, Void> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
-        var tablePartitionListener = new PartitionListener(
+        var tablePartitionListener = new TablePartitionProcessor(
                 this.txManager,
                 new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
                 storageUpdateHandler,

Reply via email to