This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2f74b03afd8 IGNITE-27458 Stop PartitionReplicaListener being
ReplicaListener (#7313)
2f74b03afd8 is described below
commit 2f74b03afd84a20eb5b7a7f04e8d09fa1f1db2bc
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 26 11:24:48 2025 +0400
IGNITE-27458 Stop PartitionReplicaListener being ReplicaListener (#7313)
---
.../partition/replicator/ReplicaPrimacy.java | 7 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 29 +---
.../internal/table/distributed/TableManager.java | 2 -
.../replicator/PartitionReplicaListener.java | 37 +-----
.../PartitionReplicaListenerIndexLockingTest.java | 14 +-
...itionReplicaListenerSortedIndexLockingTest.java | 14 +-
.../replication/PartitionReplicaListenerTest.java | 148 ++++++++++-----------
.../ZonePartitionReplicaListenerTest.java | 14 +-
.../storage/InternalTableEstimatedSizeTest.java | 28 +++-
.../apache/ignite/distributed/ItTxTestCluster.java | 2 -
.../table/impl/DummyInternalTableImpl.java | 2 -
11 files changed, 124 insertions(+), 173 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
index 83349dd3967..c2917d753da 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.ReadO
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Represents replica primacy info. Contains the following information:
@@ -54,14 +55,16 @@ public class ReplicaPrimacy {
/**
* Creates an instance representing information about the primary replica
held by this node.
*/
- static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) {
+ @VisibleForTesting
+ public static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime)
{
return new ReplicaPrimacy(leaseStartTime, null);
}
/**
* Creates an instance representing information about whether this node
currently holds the primary.
*/
- static ReplicaPrimacy forIsPrimary(boolean isPrimary) {
+ @VisibleForTesting
+ public static ReplicaPrimacy forIsPrimary(boolean isPrimary) {
return new ReplicaPrimacy(null, isPrimary);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 6c04a27b134..0e22ded3895 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -17,7 +17,6 @@
package org.apache.ignite.distributed;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,7 +27,6 @@ import static org.mockito.Mockito.mock;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -48,10 +46,8 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -64,7 +60,6 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -72,7 +67,6 @@ import
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -204,10 +198,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
transactionStateResolver,
storageUpdateHandler,
validationSchemasSource,
- localNode,
schemaSyncService,
catalogService,
- placementDriver,
clusterNodeResolver,
resourcesRegistry,
schemaRegistry,
@@ -215,22 +207,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
lowWatermark,
mock(FailureProcessor.class),
new
TableMetricSource(QualifiedName.fromSimple("test_table"))
- ) {
- @Override
- public CompletableFuture<ReplicaResult>
invoke(ReplicaRequest request, UUID senderId) {
- if (request instanceof
WriteIntentSwitchReplicaRequest) {
- logger().info("Dropping cleanup request: {}",
request);
-
- releaseTxLocks(
- ((WriteIntentSwitchReplicaRequest)
request).txId(),
- txManager.lockManager()
- );
-
- return completedFuture(new ReplicaResult(null,
null));
- }
- return super.invoke(request, senderId);
- }
- };
+ );
}
};
@@ -291,10 +268,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
}
- private static void releaseTxLocks(UUID txId, LockManager lockManager) {
- lockManager.releaseAll(txId);
- }
-
@Override
protected int nodes() {
return 1;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 68fbcf25b0c..18bc6f9650a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1077,10 +1077,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
transactionStateResolver,
partitionUpdateHandlers.storageUpdateHandler,
new CatalogValidationSchemasSource(catalogService,
schemaManager),
- localNode(),
executorInclinedSchemaSyncService,
catalogService,
- executorInclinedPlacementDriver,
topologyService,
remotelyTriggeredResourceRegistry,
schemaManager.schemaRegistry(table.tableId()),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 498feb1668c..6169c45def3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -94,11 +94,9 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
-import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
@@ -133,9 +131,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.ScanC
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import
org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
-import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.CommandApplicationResult;
import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -145,7 +141,6 @@ import
org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
-import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -206,7 +201,7 @@ import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/** Partition replication listener. */
-public class PartitionReplicaListener implements ReplicaListener,
ReplicaTableProcessor {
+public class PartitionReplicaListener implements ReplicaTableProcessor {
/**
* NB: this listener makes writes to the underlying MV partition storage
without taking the partition snapshots read lock. This causes
* the RAFT snapshots transferred to a follower being slightly
inconsistent for a limited amount of time.
@@ -257,9 +252,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
/** Versioned partition storage. */
private final MvPartitionStorage mvDataStorage;
- /** Raft client. */
- private final RaftCommandRunner raftCommandRunner;
-
/** Tx manager. */
private final TxManager txManager;
@@ -317,7 +309,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
private final TableMetricSource metrics;
- private final ReplicaPrimacyEngine replicaPrimacyEngine;
private final TableAwareReplicaRequestPreProcessor
tableAwareReplicaRequestPreProcessor;
private final ReliableCatalogVersions reliableCatalogVersions;
private final ReplicationRaftCommandApplicator raftCommandApplicator;
@@ -341,9 +332,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
* @param safeTime Safe time clock.
* @param transactionStateResolver Transaction state resolver.
* @param storageUpdateHandler Handler that processes updates writing them
to storage.
- * @param localNode Instance of the local node.
* @param catalogService Catalog service.
- * @param placementDriver Placement driver.
* @param clusterNodeResolver Node resolver.
* @param remotelyTriggeredResourceRegistry Resource registry.
* @param indexMetaStorage Index meta storage.
@@ -365,10 +354,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
ValidationSchemasSource validationSchemasSource,
- InternalClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
- LeasePlacementDriver placementDriver,
ClusterNodeResolver clusterNodeResolver,
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
SchemaRegistry schemaRegistry,
@@ -378,7 +365,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
TableMetricSource metrics
) {
this.mvDataStorage = mvDataStorage;
- this.raftCommandRunner = raftCommandRunner;
this.txManager = txManager;
this.lockManager = lockManager;
this.scanRequestExecutor = scanRequestExecutor;
@@ -399,13 +385,11 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
this.tableLockKey = new TablePartitionId(tableId,
replicationGroupId.partitionId());
this.metrics = metrics;
- this.schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
indexBuildingProcessor = new
PartitionReplicaBuildIndexProcessor(busyLock, tableId, indexMetaStorage,
catalogService);
- replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver,
clockService, replicationGroupId, localNode);
-
- this.tableAwareReplicaRequestPreProcessor = new
TableAwareReplicaRequestPreProcessor(
+ tableAwareReplicaRequestPreProcessor = new
TableAwareReplicaRequestPreProcessor(
clockService,
schemaCompatValidator,
schemaSyncService
@@ -417,12 +401,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
buildIndexReplicaRequestHandler = new
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
}
- @Override
- public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
UUID senderId) {
- return replicaPrimacyEngine.validatePrimacy(request)
- .thenCompose(replicaPrimacy ->
processRequestInContext(request, replicaPrimacy, senderId));
- }
-
@Override
public CompletableFuture<ReplicaResult> process(ReplicaRequest request,
ReplicaPrimacy replicaPrimacy, UUID senderId) {
return processRequestInContext(request, replicaPrimacy, senderId);
@@ -445,15 +423,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
}
- /** Returns Raft-client. */
- @Override
- public RaftCommandRunner raftClient() {
- if (raftCommandRunner instanceof ExecutorInclinedRaftCommandRunner) {
- return ((ExecutorInclinedRaftCommandRunner)
raftCommandRunner).decoratedCommandRunner();
- }
- return raftCommandRunner;
- }
-
private CompletableFuture<?> processRequest(ReplicaRequest request,
ReplicaPrimacy replicaPrimacy, UUID senderId) {
boolean hasSchemaVersion = request instanceof
SchemaVersionAwareReplicaRequest;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 77a47bb14ef..e78d234f72f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -72,9 +72,9 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -248,8 +248,6 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));
- InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
-
partitionReplicaListener = new PartitionReplicaListener(
TEST_MV_PARTITION_STORAGE,
mockRaftClient,
@@ -279,10 +277,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
new DummyValidationSchemasSource(schemaManager),
- localNode,
new AlwaysSyncedSchemaSyncService(),
catalogService,
- new TestPlacementDriver(localNode),
mock(ClusterNodeResolver.class),
new RemotelyTriggeredResourceRegistry(),
schemaManager,
@@ -395,7 +391,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.invoke(request,
LOCAL_NODE_ID);
+ CompletableFuture<?> fut = partitionReplicaListener.process(request,
replicaPrimacy(), LOCAL_NODE_ID);
await(fut);
@@ -422,6 +418,10 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
);
}
+ private ReplicaPrimacy replicaPrimacy() {
+ return
ReplicaPrimacy.forPrimaryReplicaRequest(HybridTimestamp.MIN_VALUE.longValue());
+ }
+
/** Verifies the mode in which the lock was acquired on the index key for
a particular operation. */
@ParameterizedTest
@MethodSource("readWriteMultiTestArguments")
@@ -484,7 +484,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.invoke(request,
LOCAL_NODE_ID);
+ CompletableFuture<?> fut = partitionReplicaListener.process(request,
replicaPrimacy(), LOCAL_NODE_ID);
await(fut);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 8d63b89dba4..cc7fe16c6c7 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -73,9 +73,9 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -223,8 +223,6 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor));
- InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
-
partitionReplicaListener = new PartitionReplicaListener(
TEST_MV_PARTITION_STORAGE,
mockRaftClient,
@@ -249,10 +247,8 @@ public class
PartitionReplicaListenerSortedIndexLockingTest extends IgniteAbstra
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
new DummyValidationSchemasSource(schemaManager),
- localNode,
new AlwaysSyncedSchemaSyncService(),
catalogService,
- new TestPlacementDriver(localNode),
mock(ClusterNodeResolver.class),
new RemotelyTriggeredResourceRegistry(),
schemaManager,
@@ -293,6 +289,10 @@ public class
PartitionReplicaListenerSortedIndexLockingTest extends IgniteAbstra
return txManager;
}
+ private static ReplicaPrimacy validRwPrimacy() {
+ return ReplicaPrimacy.forPrimaryReplicaRequest(1);
+ }
+
@BeforeEach
public void beforeTest() {
((TestSortedIndexStorage) pkStorage.get().storage()).clear();
@@ -365,7 +365,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.invoke(request,
LOCAL_NODE_ID);
+ CompletableFuture<?> fut = partitionReplicaListener.process(request,
validRwPrimacy(), LOCAL_NODE_ID);
await(fut);
@@ -444,7 +444,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.invoke(request,
LOCAL_NODE_ID);
+ CompletableFuture<?> fut = partitionReplicaListener.process(request,
validRwPrimacy(), LOCAL_NODE_ID);
await(fut);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd436d029fe..262e01d38af 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -135,6 +135,8 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.CatalogVersionAware;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
@@ -473,6 +475,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Mock
private IndexMetaStorage indexMetaStorage;
+ private ReplicaPrimacyEngine primacyEngine;
+
private static UUID nodeId(int id) {
return new UUID(0, id);
}
@@ -660,10 +664,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
validationSchemasSource,
- localNode,
schemaSyncService,
catalogService,
- placementDriver,
new SingleClusterNodeResolver(localNode),
new RemotelyTriggeredResourceRegistry(),
new DummySchemaManagerImpl(schemaDescriptor,
schemaDescriptorVersion2),
@@ -678,6 +680,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
when(lowWatermark.tryLock(any(), any())).thenReturn(true);
+ primacyEngine = new ReplicaPrimacyEngine(placementDriver,
clockService, grpId, localNode);
+
reset();
}
@@ -760,7 +764,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<ReplicaResult> invokeListener(ReplicaRequest
request) {
- return partitionReplicaListener.invoke(request, localNode.id());
+ return processWithPrimacy(request);
+ }
+
+ private static ReplicaPrimacy validRoPrimacy() {
+ return ReplicaPrimacy.forIsPrimary(true);
}
private ReadOnlySingleRowPkReplicaRequest
readOnlySingleRowPkReplicaRequest(BinaryRow pk, HybridTimestamp readTimestamp) {
@@ -891,7 +899,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID scanTxId = newTxId();
// Request first batch
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+ CompletableFuture<ReplicaResult> fut = processWithPrimacy(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -904,7 +912,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(), localNode.id());
+ .build());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1,
TimeUnit.SECONDS).result();
@@ -912,7 +920,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(4, rows.size());
// Request second batch
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+ fut =
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(scanTxId)
@@ -924,7 +932,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(), localNode.id());
+ .build());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -932,7 +940,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Request bounded.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+ fut =
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -947,7 +955,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(), localNode.id());
+ .build());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -955,7 +963,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Empty result.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+ fut =
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -968,7 +976,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(), localNode.id());
+ .build());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -976,7 +984,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(0, rows.size());
// Lookup.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+ fut =
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -989,7 +997,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(), localNode.id());
+ .build());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1019,7 +1027,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID scanTxId = newTxId();
// Request first batch
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+ CompletableFuture<ReplicaResult> fut =
partitionReplicaListener.process(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1029,7 +1037,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.indexToUse(sortedIndexId)
.batchSize(4)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1,
TimeUnit.SECONDS).result();
@@ -1037,7 +1045,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(4, rows.size());
// Request second batch
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(scanTxId)
@@ -1046,7 +1054,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.indexToUse(sortedIndexId)
.batchSize(4)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1054,7 +1062,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Request bounded.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1066,7 +1074,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1074,7 +1082,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Empty result.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1084,7 +1092,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1092,7 +1100,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(0, rows.size());
// Lookup.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1102,7 +1110,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.exactKey(toIndexKey(0))
.batchSize(5)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1132,7 +1140,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID scanTxId = newTxId();
// Request first batch
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
+ CompletableFuture<ReplicaResult> fut =
partitionReplicaListener.process(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1143,7 +1151,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.exactKey(toIndexKey(0))
.batchSize(3)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1,
TimeUnit.SECONDS).result();
@@ -1151,7 +1159,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(3, rows.size());
// Request second batch
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(scanTxId)
@@ -1161,7 +1169,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.exactKey(toIndexKey(0))
.batchSize(1)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1169,7 +1177,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(1, rows.size());
// Empty result.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1179,7 +1187,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.exactKey(toIndexKey(5))
.batchSize(5)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1187,7 +1195,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(0, rows.size());
// Lookup.
- fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1197,7 +1205,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.exactKey(toIndexKey(1))
.batchSize(5)
.coordinatorId(localNode.id())
- .build(), localNode.id());
+ .build(), validRoPrimacy(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1343,7 +1351,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
boolean full,
@Nullable String txLabel
) {
- return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ return
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(txId)
@@ -1356,8 +1364,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.full(full)
.timestamp(clock.now())
.txLabel(txLabel)
- .build(),
- localNode.id()
+ .build()
);
}
@@ -1376,7 +1383,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
boolean full,
@Nullable String txLabel
) {
- return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
+ return
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(txId)
@@ -1389,8 +1396,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.full(full)
.timestamp(clock.now())
.txLabel(txLabel)
- .build(),
- localNode.id()
+ .build()
);
}
@@ -1420,7 +1426,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
boolean full,
@Nullable String txLabel
) {
- return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+ return
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(txId)
@@ -1433,8 +1439,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.full(full)
.timestamp(clock.now())
.txLabel(txLabel)
- .build(),
- localNode.id()
+ .build()
);
}
@@ -1457,7 +1462,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
boolean full,
@Nullable String txLabel
) {
- return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
+ return
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(txId)
@@ -1470,8 +1475,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.full(full)
.timestamp(clock.now())
.txLabel(txLabel)
- .build(),
- localNode.id()
+ .build()
);
}
@@ -1785,7 +1789,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow
oldRow, BinaryRow newRow, boolean full) {
- return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
+ return
processWithPrimacy(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(targetTxId)
@@ -1798,15 +1802,14 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.coordinatorId(localNode.id())
.full(full)
.timestamp(clock.now())
- .build(),
- localNode.id()
+ .build()
);
}
@Test
public void
failsWhenScanByExactMatchReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
- (targetTxId, key) -> partitionReplicaListener.invoke(
+ (targetTxId, key) -> processWithPrimacy(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1819,8 +1822,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(),
- localNode.id()
+ .build()
)
);
}
@@ -1828,7 +1830,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
public void
failsWhenScanByIndexReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
- (targetTxId, key) -> partitionReplicaListener.invoke(
+ (targetTxId, key) -> processWithPrimacy(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1840,8 +1842,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(),
- localNode.id()
+ .build()
)
);
}
@@ -1928,7 +1929,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId)
{
- return partitionReplicaListener.invoke(
+ return processWithPrimacy(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1940,24 +1941,19 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.timestamp(clock.now())
- .build(),
- localNode.id()
+ .build()
);
}
private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
- ZonePartitionIdMessage serializedMsg =
- zonePartitionIdMessage(new
ZonePartitionId(tableDescriptor.zoneId(), grpId.partitionId()));
-
- return partitionReplicaListener.invoke(
+ return processWithPrimacy(
TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest()
- .groupId(serializedMsg)
+ .groupId(zonePartitionIdMessage(new
ZonePartitionId(tableDescriptor.zoneId(), grpId.partitionId())))
.tableId(grpId.zoneId())
.transactionId(targetTxId)
.timestamp(beginTimestamp(targetTxId))
.scanId(1)
- .build(),
- localNode.id()
+ .build()
);
}
@@ -2313,7 +2309,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.timestamp(clock.now())
.build();
- return partitionReplicaListener.invoke(message, localNode.id());
+ return processWithPrimacy(message);
}
private void delete(UUID txId, BinaryRow row) {
@@ -2330,7 +2326,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.timestamp(clock.now())
.build();
- assertThat(partitionReplicaListener.invoke(message, localNode.id()),
willCompleteSuccessfully());
+ assertThat(processWithPrimacy(message), willCompleteSuccessfully());
}
private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
@@ -2344,7 +2340,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row,
HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest message =
readOnlySingleRowPkReplicaRequest(row, readTimestamp);
- return partitionReplicaListener.invoke(message,
localNode.id()).thenApply(replicaResult -> (BinaryRow) replicaResult.result());
+ return partitionReplicaListener.process(message, validRoPrimacy(),
localNode.id())
+ .thenApply(replicaResult -> (BinaryRow)
replicaResult.result());
}
private List<BinaryRow> roGetAll(Collection<BinaryRow> rows,
HybridTimestamp readTimestamp) {
@@ -2743,7 +2740,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
PrimaryReplicaRequest request = mock(requestClass);
- assertThat(partitionReplicaListener.invoke(request, localNode.id()),
willThrow(PrimaryReplicaMissException.class));
+ assertThat(processWithPrimacy(request),
willThrow(PrimaryReplicaMissException.class));
}
@ParameterizedTest
@@ -2757,7 +2754,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
PrimaryReplicaRequest request = mock(requestClass);
when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime -
1000);
- assertThat(partitionReplicaListener.invoke(request, localNode.id()),
willThrow(PrimaryReplicaMissException.class));
+ assertThat(processWithPrimacy(request),
willThrow(PrimaryReplicaMissException.class));
}
@ParameterizedTest
@@ -2771,21 +2768,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
PrimaryReplicaRequest request = mock(requestClass);
when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
- assertThat(partitionReplicaListener.invoke(request, localNode.id()),
willThrow(PrimaryReplicaMissException.class));
+ assertThat(processWithPrimacy(request),
willThrow(PrimaryReplicaMissException.class));
}
- @ParameterizedTest
- @ValueSource(classes = {PrimaryReplicaRequest.class,
TxStateCommitPartitionRequest.class})
- void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<?
extends PrimaryReplicaRequest> requestClass) {
- long leaseStartTime = clock.nowLong();
- placementDriver.setPrimaryReplicaSupplier(
- () -> new TestReplicaMetaImpl(anotherNode,
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
- );
-
- PrimaryReplicaRequest request = mock(requestClass);
- when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
-
- assertThat(partitionReplicaListener.invoke(request, localNode.id()),
willThrow(PrimaryReplicaMissException.class));
+ private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest
request) {
+ return primacyEngine.validatePrimacy(request)
+ .thenCompose(primacy ->
partitionReplicaListener.process(request, primacy, localNode.id()));
}
private static class RequestContext {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index f6a6185d4f5..89b21369687 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -116,6 +116,7 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
@@ -452,6 +453,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
@Mock
private IndexMetaStorage indexMetaStorage;
+ private ReplicaPrimacyEngine primacyEngine;
+
private static UUID nodeId(int id) {
return new UUID(0, id);
}
@@ -658,10 +661,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
),
validationSchemasSource,
- localNode,
schemaSyncService,
catalogService,
- placementDriver,
new SingleClusterNodeResolver(localNode),
new RemotelyTriggeredResourceRegistry(),
new DummySchemaManagerImpl(schemaDescriptor,
schemaDescriptorVersion2),
@@ -675,6 +676,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
when(lowWatermark.tryLock(any(), any())).thenReturn(true);
+ primacyEngine = new ReplicaPrimacyEngine(placementDriver,
clockService, grpId, localNode);
+
reset();
}
@@ -1881,7 +1884,12 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
.timestamp(clock.now())
.build();
- return tableReplicaProcessor.invoke(message, localNode.id());
+ return processWithPrimacy(message);
+ }
+
+ private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest
request) {
+ return primacyEngine.validatePrimacy(request)
+ .thenCompose(primacy -> tableReplicaProcessor.process(request,
primacy, localNode.id()));
}
private CompletableFuture<ReplicaResult>
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 8a46eae550f..01e81855b8f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -74,12 +75,12 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -250,13 +251,30 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
))
.collect(toList());
+ Map<ZonePartitionId, ReplicaPrimacyEngine> primacyEngines = new
HashMap<>();
+ for (int partitionIndex = 0; partitionIndex < PARTITIONS_NUM;
partitionIndex++) {
+ var zonePartitionId = new ZonePartitionId(ZONE_ID, partitionIndex);
+
+ var primacyEngine = new ReplicaPrimacyEngine(
+ placementDriver,
+ clockService,
+ zonePartitionId,
+ node
+ );
+
+ primacyEngines.put(zonePartitionId, primacyEngine);
+ }
+
lenient().doAnswer(invocation -> {
ReplicaRequest request = invocation.getArgument(1);
- var tablePartitionId = (PartitionGroupId)
request.groupId().asReplicationGroupId();
+ var zonePartitionId = (ZonePartitionId)
request.groupId().asReplicationGroupId();
- return
partitionReplicaListeners.get(tablePartitionId.partitionId())
- .invoke(request, node.id())
+ return primacyEngines.get(zonePartitionId)
+ .validatePrimacy(request)
+ .thenCompose(
+ primacy ->
partitionReplicaListeners.get(zonePartitionId.partitionId()).process(request,
primacy, node.id())
+ )
.thenApply(replicaResult -> new ReplicaMessagesFactory()
.replicaResponse()
.result(replicaResult.result())
@@ -321,10 +339,8 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
transactionStateResolver,
storageUpdateHandler,
validationSchemasSource,
- node,
schemaSyncService,
catalogService,
- placementDriver,
clusterNodeResolver,
remotelyTriggeredResourceRegistry,
schemaRegistry,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2ec4dea377d..ae79a299b63 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -1058,10 +1058,8 @@ public class ItTxTestCluster {
transactionStateResolver,
storageUpdateHandler,
validationSchemasSource,
- localNode,
schemaSyncService,
catalogService,
- placementDriver,
clusterNodeResolver,
resourcesRegistry,
schemaRegistry,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0caf6b7f99d..f4306d09da0 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -486,10 +486,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
transactionStateResolver,
storageUpdateHandler,
new DummyValidationSchemasSource(schemaManager),
- LOCAL_NODE,
new AlwaysSyncedSchemaSyncService(),
catalogService,
- placementDriver,
mock(ClusterNodeResolver.class),
resourcesRegistry,
schemaManager,