This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1232ff1dd65 IGNITE-27319 Rework PartitionListener to be
TablePartitionProcessor and not RaftGroupListener (#7218)
1232ff1dd65 is described below
commit 1232ff1dd651e90c27810f95cd150b21e8e1e421
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Dec 15 13:13:12 2025 +0200
IGNITE-27319 Rework PartitionListener to be TablePartitionProcessor and not
RaftGroupListener (#7218)
---
.../ItZonePartitionRaftListenerRecoveryTest.java | 4 +-
.../PartitionReplicaLifecycleManager.java | 4 +-
.../raft/ZonePartitionRaftListenerTest.java | 12 +-
.../ItTxObservableTimePropagationTest.java | 8 +-
.../ReplicasSafeTimePropagationTest.java | 36 ++-
.../internal/table/distributed/TableManager.java | 4 +-
...nListener.java => TablePartitionProcessor.java} | 89 +------
.../distributed/raft/handlers/package-info.java | 2 +-
.../raft/PartitionCommandListenerTest.java | 289 +++++----------------
.../apache/ignite/distributed/ItTxTestCluster.java | 4 +-
.../internal/table/TxInfrastructureTest.java | 8 +-
.../table/impl/DummyInternalTableImpl.java | 4 +-
12 files changed, 104 insertions(+), 360 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index 51516a437c7..8cf057178f0 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -110,7 +110,7 @@ import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -394,7 +394,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
- return new PartitionListener(
+ return new TablePartitionProcessor(
txManager,
storage,
storageUpdateHandler,
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 06b6f5d5335..ad67418c33f 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -57,6 +57,8 @@ import static
org.apache.ignite.internal.partitiondistribution.Assignments.assig
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
+import static
org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_INDEX;
+import static
org.apache.ignite.internal.raft.RaftGroupConfiguration.UNKNOWN_TERM;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.COLLECTION_LIMIT;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -920,7 +922,7 @@ public class PartitionReplicaLifecycleManager extends
if (haMode) {
ByteArray assignmentsChainKey =
assignmentsChainKey(zonePartitionId);
- byte[] assignmentChain =
AssignmentsChain.of(newAssignments.get(i)).toBytes();
+ byte[] assignmentChain = AssignmentsChain.of(UNKNOWN_TERM,
UNKNOWN_INDEX, newAssignments.get(i)).toBytes();
Operation chainOp = put(assignmentsChainKey, assignmentChain);
partitionAssignments.add(chainOp);
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 794221e0d15..8c6617ab460 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -80,7 +80,7 @@ import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -266,9 +266,9 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
- PartitionListener partitionListener = partitionListener(TABLE_ID);
+ TablePartitionProcessor tablePartitionProcessor =
partitionListener(TABLE_ID);
- listener.addTableProcessor(TABLE_ID, partitionListener);
+ listener.addTableProcessor(TABLE_ID, tablePartitionProcessor);
verify(mvPartitionStorage).lastApplied(2L, 3L);
verify(mvPartitionStorage).committedGroupConfiguration(any());
@@ -410,7 +410,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
void testSkipWriteCommandByAppliedIndex() {
mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
- PartitionListener tableProcessor = partitionListener(TABLE_ID);
+ TablePartitionProcessor tableProcessor = partitionListener(TABLE_ID);
listener.addTableProcessor(TABLE_ID, tableProcessor);
// Update(All)Command handling requires both information about raft
group topology and the primary replica,
@@ -552,14 +552,14 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
return commandClosure;
}
- private PartitionListener partitionListener(int tableId) {
+ private TablePartitionProcessor partitionListener(int tableId) {
LeasePlacementDriver placementDriver =
mock(LeasePlacementDriver.class);
lenient().when(placementDriver.getCurrentPrimaryReplica(any(),
any())).thenReturn(null);
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
- return new PartitionListener(
+ return new TablePartitionProcessor(
txManager,
new SnapshotAwarePartitionDataStorage(
tableId,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
index 4de1eebe1be..b773d40dc08 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
@@ -39,7 +39,7 @@ import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingSta
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.TxInfrastructureTest;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -138,7 +138,7 @@ public class ItTxObservableTimePropagationTest extends
TxInfrastructureTest {
}
var fsm = (JraftServerImpl.DelegatingStateMachine)
raftNode.getOptions().getFsm();
- PartitionListener listener = extractPartitionListener(fsm,
accounts);
+ TablePartitionProcessor listener =
extractPartitionListener(fsm, accounts);
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime
= listener.getSafeTimeTracker();
try {
@@ -172,7 +172,7 @@ public class ItTxObservableTimePropagationTest extends
TxInfrastructureTest {
assertTrue(commitTs2.compareTo(commitTs) > 0, "Invalid safe time");
}
- private static PartitionListener
extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
- return (PartitionListener) ((ZonePartitionRaftListener)
fsm.getListener()).tableProcessor(table.tableId());
+ private static TablePartitionProcessor
extractPartitionListener(DelegatingStateMachine fsm, TableViewInternal table) {
+ return (TablePartitionProcessor) ((ZonePartitionRaftListener)
fsm.getListener()).tableProcessor(table.tableId());
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 6146e5a1191..4b215b7f6aa 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -43,12 +43,12 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.TestHybridClock;
-import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -59,7 +59,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
@@ -77,14 +78,11 @@ import
org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
-import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -383,23 +381,23 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
ClockService clockService = mock(ClockService.class);
when(clockService.current()).thenReturn(clock.current());
+ OutgoingSnapshotsManager outgoingSnapshotsManager = new
OutgoingSnapshotsManager(
+ clusterService.nodeName(),
+ clusterService.messagingService(),
+ new NoOpFailureManager()
+ );
+
this.raftClient = raftManager.startRaftGroupNode(
new RaftNodeId(GROUP_ID, new Peer(nodeName)),
fromConsistentIds(cluster.keySet()),
- new PartitionListener(
+ new ZonePartitionRaftListener(
+ GROUP_ID,
+ mock(TxStatePartitionStorage.class),
txManagerMock,
- mock(PartitionDataStorage.class),
- mock(StorageUpdateHandler.class),
safeTs,
- mock(CatalogService.class),
- mock(SchemaRegistry.class),
- mock(IndexMetaStorage.class),
-
clusterService.topologyService().localMember().id(),
- mock(MinimumRequiredTimeCollectorService.class),
- mock(Executor.class),
- placementDriver,
- clockService,
- GROUP_ID
+
mock(PendingIndependentComparableValuesTracker.class),
+ outgoingSnapshotsManager,
+ mock(Executor.class)
) {
@Override
public void
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b6c672be1f0..928f430d891 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -166,7 +166,7 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
@@ -933,7 +933,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
raftClient
);
- var tablePartitionRaftListener = new PartitionListener(
+ var tablePartitionRaftListener = new TablePartitionProcessor(
txManager,
partitionDataStorage,
partitionUpdateHandlers.storageUpdateHandler,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
similarity index 86%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 10eb40d2cbb..c88fa1de9c6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -30,19 +30,14 @@ import static
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAt
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
-import java.nio.file.Path;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.function.Consumer;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
@@ -55,15 +50,10 @@ import
org.apache.ignite.internal.partition.replicator.raft.handlers.CommandHand
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
-import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.CommandClosure;
-import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
-import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -85,11 +75,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Partition command handler.
*/
-// TODO ignite-22522 Rename to TablePartitionProcessor and remove implements
RaftGroupListener
-public class PartitionListener implements RaftGroupListener,
RaftTableProcessor {
- /** Logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(PartitionListener.class);
-
+public class TablePartitionProcessor implements RaftTableProcessor {
/** Transaction manager. */
private final TxManager txManager;
@@ -123,7 +109,7 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
private ReplicaMeta lastKnownLease;
/** Constructor. */
- public PartitionListener(
+ public TablePartitionProcessor(
TxManager txManager,
PartitionDataStorage partitionDataStorage,
StorageUpdateHandler storageUpdateHandler,
@@ -195,66 +181,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
}
}
- @Override
- public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
- iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo)
-> {
- Command command = clo.command();
-
- assert false : "No read commands expected, [cmd=" + command + ']';
- });
- }
-
- @Override
- public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
- iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo)
-> {
- WriteCommand command = clo.command();
-
- long commandIndex = clo.index();
- long commandTerm = clo.term();
- @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
- assert safeTimestamp == null || command instanceof
SafeTimePropagatingCommand : command;
-
- long storagesAppliedIndex = storage.lastAppliedIndex();
-
- assert commandIndex > storagesAppliedIndex :
- "Write command must have an index greater than that of
storages [commandIndex=" + commandIndex
- + ", mvAppliedIndex=" + storage.lastAppliedIndex()
+ "]";
-
- CommandResult result;
-
- // NB: Make sure that ANY command we accept here updates
lastAppliedIndex+term info in one of the underlying
- // storages!
- // Otherwise, a gap between lastAppliedIndex from the point of
view of JRaft and our storage might appear.
- // If a leader has such a gap, and does doSnapshot(), it will
subsequently truncate its log too aggressively
- // in comparison with 'snapshot' state stored in our storages; and
if we install a snapshot from our storages
- // to a follower at this point, for a subsequent AppendEntries the
leader will not be able to get prevLogTerm
- // (because it's already truncated in the leader's log), so it
will have to install a snapshot again, and then
- // repeat same thing over and over again.
-
- storage.acquirePartitionSnapshotsReadLock();
-
- try {
- result = processCommand(command, commandIndex, commandTerm,
safeTimestamp);
- } catch (Throwable t) {
- LOG.error(
- "Got error while processing command [commandIndex={},
commandTerm={}, command={}]",
- t,
- clo.index(), clo.index(), command
- );
-
- clo.result(t);
-
- throw t;
- } finally {
- storage.releasePartitionSnapshotsReadLock();
- }
-
- // Completing the closure out of the partition snapshots lock to
reduce possibility of deadlocks as it might
- // trigger other actions taking same locks.
- clo.result(result.result());
- });
- }
-
@Override
public CommandResult processCommand(
WriteCommand command,
@@ -567,17 +493,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
currentGroupTopology.addAll(config.learners());
}
- @Override
- public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
- throw new UnsupportedOperationException("!!! It's not expected that
PartitionListener onSnapshotSave will be called.");
-
- }
-
- @Override
- public boolean onSnapshotLoad(Path path) {
- return true;
- }
-
@Override
public void onShutdown() {
storage.close();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
index 73bb636fa4c..f5336c82260 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
@@ -17,7 +17,7 @@
/**
* This package contains RAFT command handlers that is used by
- * {@link org.apache.ignite.internal.table.distributed.raft.PartitionListener}
aka table raft processor.
+ * {@link
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor} aka
table raft processor.
*/
package org.apache.ignite.internal.table.distributed.raft.handlers;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 2403a7d2f1f..d64bef61ea6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -22,11 +22,9 @@ import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUuidFrom;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -34,7 +32,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
@@ -45,22 +42,18 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -81,15 +74,14 @@ import
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
-import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
@@ -131,9 +123,6 @@ import
org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.UpdateCommandResult;
-import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
-import
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Cursor;
@@ -143,8 +132,6 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -173,7 +160,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private static final SchemaRegistry SCHEMA_REGISTRY = new
DummySchemaManagerImpl(SCHEMA);
- private PartitionListener commandListener;
+ private TablePartitionProcessor commandListener;
private final AtomicLong raftIndex = new AtomicLong();
@@ -196,8 +183,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private final PartitionDataStorage partitionDataStorage = spy(new
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
- private final TxStatePartitionStorage txStatePartitionStorage = spy(new
TestTxStatePartitionStorage());
-
@WorkDirectory
private Path workDir;
@@ -210,12 +195,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private SafeTimeValuesTracker safeTimeTracker;
- @Captor
- private ArgumentCaptor<Throwable> commandClosureResultCaptor;
-
- @Captor
- private ArgumentCaptor<UpdateCommandResult>
updateCommandClosureResultCaptor;
-
private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
private IndexUpdateHandler indexUpdateHandler;
@@ -295,7 +274,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(hybridClock.current());
- commandListener = new PartitionListener(
+ commandListener = new TablePartitionProcessor(
mock(TxManager.class),
partitionDataStorage,
storageUpdateHandler,
@@ -336,7 +315,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
.build();
-
commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(),
1, command, null, null)).iterator());
+ commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, null);
}
}
@@ -410,44 +389,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
readAndCheck(false);
}
- private CommandClosure<WriteCommand> writeCommandCommandClosure(
- long index,
- long term,
- WriteCommand writeCommand
- ) {
- return writeCommandCommandClosure(index, term, writeCommand, null,
hybridClock.now());
- }
-
- /**
- * Create a command closure.
- *
- * @param index Index of the RAFT command.
- * @param term Term of RAFT command.
- * @param writeCommand Write command.
- * @param resultClosureCaptor Captor for {@link
CommandClosure#result(Serializable)}
- * @param safeTimestamp The safe timestamp.
- */
- private static CommandClosure<WriteCommand> writeCommandCommandClosure(
- long index,
- long term,
- WriteCommand writeCommand,
- @Nullable ArgumentCaptor<? extends Serializable>
resultClosureCaptor,
- @Nullable HybridTimestamp safeTimestamp
- ) {
- CommandClosure<WriteCommand> commandClosure =
mock(CommandClosure.class);
-
- when(commandClosure.index()).thenReturn(index);
- when(commandClosure.term()).thenReturn(term);
- when(commandClosure.command()).thenReturn(writeCommand);
- when(commandClosure.safeTimestamp()).thenReturn(safeTimestamp);
-
- if (resultClosureCaptor != null) {
-
doNothing().when(commandClosure).result(resultClosureCaptor.capture());
- }
-
- return commandClosure;
- }
-
@Test
void updatesLastAppliedForUpdateCommands() {
safeTimeTracker.update(hybridClock.now(), null);
@@ -461,9 +402,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.initiatorTime(hybridClock.now())
.build();
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, command)
- ).iterator());
+ commandListener.processCommand(command, 3, 2, hybridClock.now());
verify(mvPartitionStorage).lastApplied(3, 2);
}
@@ -484,9 +423,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.initiatorTime(hybridClock.now())
.build();
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, command)
- ).iterator());
+ commandListener.processCommand(command, 3, 2, hybridClock.now());
verify(mvPartitionStorage).lastApplied(3, 2);
}
@@ -497,15 +434,12 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.safeTimeSyncCommand()
.initiatorTime(hybridClock.now());
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(),
commandClosureResultCaptor, hybridClock.now())
- ).iterator());
+ commandListener.processCommand(safeTimeSyncCommand.build(), 3, 2,
hybridClock.now());
InOrder inOrder = inOrder(partitionDataStorage);
inOrder.verify(partitionDataStorage).acquirePartitionSnapshotsReadLock();
inOrder.verify(partitionDataStorage).lastApplied(3, 2);
-
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
}
@Test
@@ -697,8 +631,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private void applySafeTimeCommand(Class<? extends
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
SafeTimePropagatingCommand command = mock(cls);
- CommandClosure<WriteCommand> closure = writeCommandCommandClosure(3,
1, command, commandClosureResultCaptor, timestamp);
- commandListener.onWrite(asList(closure).iterator());
+ commandListener.processCommand(command, 3, 1, timestamp);
assertEquals(timestamp, safeTimeTracker.current());
}
@@ -713,9 +646,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.safeTime(hybridClock.now())
.build();
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(raftIndex.incrementAndGet(), 2,
command)
- ).iterator());
+ commandListener.processCommand(command, raftIndex.incrementAndGet(),
2, hybridClock.now());
verify(mvPartitionStorage).lastApplied(raftIndex.get(), 2);
}
@@ -730,74 +661,11 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.safeTime(hybridClock.now())
.build();
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, safeTimeSyncCommand)
- ).iterator());
+ commandListener.processCommand(safeTimeSyncCommand, 3, 2,
hybridClock.now());
verify(mvPartitionStorage).lastApplied(3, 2);
}
- /**
- * Prepares a closure iterator for a specific batch operation.
- *
- * @param func The function prepare a closure for the operation.
- * @param <T> Type of the operation.
- * @return Closure iterator.
- */
- private <T extends Command> Iterator<CommandClosure<T>>
batchIterator(Consumer<CommandClosure<T>> func) {
- return new Iterator<>() {
- boolean moved;
-
- @Override
- public boolean hasNext() {
- return !moved;
- }
-
- @Override
- public CommandClosure<T> next() {
- CommandClosure<T> clo = mock(CommandClosure.class);
-
- func.accept(clo);
-
- moved = true;
-
- return clo;
- }
- };
- }
-
- /**
- * Prepares a closure iterator for a specific operation.
- *
- * @param func The function prepare a closure for the operation.
- * @param <T> Type of the operation.
- * @return Closure iterator.
- */
- private <T extends Command> Iterator<CommandClosure<T>>
iterator(BiConsumer<Integer, CommandClosure<T>> func) {
- return new Iterator<>() {
- /** Iteration. */
- private int it = 0;
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return it < KEY_COUNT;
- }
-
- /** {@inheritDoc} */
- @Override
- public CommandClosure<T> next() {
- CommandClosure<T> clo = mock(CommandClosure.class);
-
- func.accept(it, clo);
-
- it++;
-
- return clo;
- }
- };
- }
-
/**
* Inserts all rows.
*/
@@ -841,7 +709,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
/**
* Update values from the listener in the batch operation.
*
- * @param keyValueMapper Mep a value to update to the iter number.
+ * @param keyValueMapper Map a value to update to the iter number.
*/
private void updateAll(Function<Integer, Integer> keyValueMapper) {
UUID txId = TestTransactionIds.newTransactionId();
@@ -922,41 +790,33 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
/**
* Update rows.
*
- * @param keyValueMapper Mep a value to update to the iter number.
+ * @param keyValueMapper Map a value to update to the iter number.
*/
private void update(Function<Integer, Integer> keyValueMapper) {
List<UUID> txIds = new ArrayList<>();
- commandListener.onWrite(iterator((i, clo) -> {
+ for (int i = 0; i < KEY_COUNT; i++) {
UUID txId = TestTransactionIds.newTransactionId();
BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
ReadResult readResult = readRow(getTestKey(i));
txIds.add(txId);
- when(clo.safeTimestamp()).thenReturn(hybridClock.now());
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- when(clo.command()).thenReturn(
- PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
- .tableId(TABLE_ID)
- .commitPartitionId(defaultPartitionIdMessage())
- .rowUuid(readResult.rowId().uuid())
-
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
- .binaryRowMessage(row)
- .build())
- .txId(txId)
- .initiatorTime(hybridClock.now())
- .safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
- .build());
-
- doAnswer(invocation -> {
- assertTrue(invocation.getArgument(0) instanceof
UpdateCommandResult);
-
- return null;
- }).when(clo).result(any());
- }));
+ UpdateCommandV2 command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+ .tableId(TABLE_ID)
+ .commitPartitionId(defaultPartitionIdMessage())
+ .rowUuid(readResult.rowId().uuid())
+
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
+ .binaryRowMessage(row)
+ .build())
+ .txId(txId)
+ .initiatorTime(hybridClock.now())
+ .safeTime(hybridClock.now())
+ .txCoordinatorId(UUID.randomUUID())
+ .build();
+
+ commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
+ }
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -983,32 +843,24 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private void delete() {
List<UUID> txIds = new ArrayList<>();
- commandListener.onWrite(iterator((i, clo) -> {
+ for (int i = 0; i < KEY_COUNT; i++) {
UUID txId = TestTransactionIds.newTransactionId();
ReadResult readResult = readRow(getTestKey(i));
txIds.add(txId);
- when(clo.safeTimestamp()).thenReturn(hybridClock.now());
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- when(clo.command()).thenReturn(
- PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
- .tableId(TABLE_ID)
- .commitPartitionId(defaultPartitionIdMessage())
- .rowUuid(readResult.rowId().uuid())
- .txId(txId)
- .initiatorTime(hybridClock.now())
- .safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
- .build());
-
- doAnswer(invocation -> {
- assertTrue(invocation.getArgument(0) instanceof
UpdateCommandResult);
+ UpdateCommandV2 command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+ .tableId(TABLE_ID)
+ .commitPartitionId(defaultPartitionIdMessage())
+ .rowUuid(readResult.rowId().uuid())
+ .txId(txId)
+ .initiatorTime(hybridClock.now())
+ .safeTime(hybridClock.now())
+ .txCoordinatorId(UUID.randomUUID())
+ .build();
- return null;
- }).when(clo).result(any());
- }));
+ commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
+ }
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -1061,33 +913,25 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private void insert() {
List<UUID> txIds = new ArrayList<>();
- commandListener.onWrite(iterator((i, clo) -> {
- UUID txId = TestTransactionIds.newTransactionId();
- txIds.add(txId);
+ for (int i = 0; i < KEY_COUNT; i++) {
+ UUID txId0 = TestTransactionIds.newTransactionId();
+ txIds.add(txId0);
+
+ UpdateCommandV2 command =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
+ .tableId(TABLE_ID)
+ .commitPartitionId(defaultPartitionIdMessage())
+ .rowUuid(UUID.randomUUID())
+
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
+ .binaryRowMessage(getTestRow(i, i))
+ .build())
+ .txId(txId0)
+ .initiatorTime(hybridClock.now())
+ .safeTime(hybridClock.now())
+ .txCoordinatorId(UUID.randomUUID())
+ .build();
- when(clo.safeTimestamp()).thenReturn(hybridClock.now());
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- when(clo.command()).thenReturn(
- PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
- .tableId(TABLE_ID)
- .commitPartitionId(defaultPartitionIdMessage())
- .rowUuid(UUID.randomUUID())
-
.messageRowToUpdate(PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
- .binaryRowMessage(getTestRow(i, i))
- .build())
- .txId(txId)
- .initiatorTime(hybridClock.now())
- .safeTime(hybridClock.now())
- .txCoordinatorId(UUID.randomUUID())
- .build());
-
- doAnswer(invocation -> {
- assertTrue(invocation.getArgument(0) instanceof
UpdateCommandResult);
-
- return null;
- }).when(clo).result(any());
- }));
+ commandListener.processCommand(command,
raftIndex.incrementAndGet(), 1, hybridClock.now());
+ }
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -1135,22 +979,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
}
private void invokeBatchedCommand(WriteCommand cmd) {
- commandListener.onWrite(batchIterator(clo -> {
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- doAnswer(invocation -> {
- if (cmd instanceof UpdateCommand || cmd instanceof
UpdateAllCommand) {
- assertTrue(invocation.getArgument(0) instanceof
UpdateCommandResult);
- } else {
- assertNull(invocation.getArgument(0));
- }
-
- return null;
- }).when(clo).result(any());
-
- when(clo.safeTimestamp()).thenReturn(hybridClock.now());
- when(clo.command()).thenReturn(cmd);
- }));
+ commandListener.processCommand(cmd, raftIndex.incrementAndGet(), 1,
hybridClock.now());
}
private @Nullable ReadResult readRow(BinaryTuple pk) {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 41875bd2bfe..2ec4dea377d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -156,7 +156,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -891,7 +891,7 @@ public class ItTxTestCluster {
)
);
- PartitionListener tablePartitionRaftListener = new PartitionListener(
+ TablePartitionProcessor tablePartitionRaftListener = new
TablePartitionProcessor(
txManagers.get(assignment),
partitionDataStorage,
storageUpdateHandler,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
index e2411867a9e..9a4cb38b1f0 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
@@ -56,7 +56,7 @@ import
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -299,11 +299,11 @@ public abstract class TxInfrastructureTest extends
IgniteAbstractTest {
var fsm = (JraftServerImpl.DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
- PartitionListener listener;
+ TablePartitionProcessor listener;
if (colocationEnabled()) {
- listener = (PartitionListener) ((ZonePartitionRaftListener)
fsm.getListener()).tableProcessor(table.tableId());
+ listener = (TablePartitionProcessor)
((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(table.tableId());
} else {
- listener = (PartitionListener) fsm.getListener();
+ listener = (TablePartitionProcessor) fsm.getListener();
}
MvPartitionStorage storage = listener.getMvStorage();
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0d9d6d4872c..0caf6b7f99d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -123,7 +123,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -523,7 +523,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
lenient().when(clockService.current()).thenReturn(clock.current());
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
- var tablePartitionListener = new PartitionListener(
+ var tablePartitionListener = new TablePartitionProcessor(
this.txManager,
new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
storageUpdateHandler,