This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f6c022a822 IGNITE-22218 Remove TableRaftService in favor of using
RaftGroupService from Replica instances (#3973)
f6c022a822 is described below
commit f6c022a822c8e1185d65464dda0af9be23468b80
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Aug 5 18:00:13 2024 +0600
IGNITE-22218 Remove TableRaftService in favor of using RaftGroupService
from Replica instances (#3973)
---
.../ignite/client/fakes/FakeInternalTable.java | 7 -
modules/index/build.gradle | 1 +
.../replicator/ItReplicaLifecycleTest.java | 14 +-
.../PartitionReplicaLifecycleManager.java | 225 ++++++++++++++++-----
.../ItPrimaryReplicaChoiceTest.java | 17 +-
.../ItPlacementDriverReplicaSideTest.java | 1 -
.../ignite/internal/replicator/ReplicaManager.java | 19 +-
.../internal/replicator/ReplicaManagerTest.java | 1 -
.../internal/replicator/ReplicaTestUtils.java | 139 +++++++++++++
modules/runner/build.gradle | 1 +
.../app/ItIgniteInMemoryNodeRestartTest.java | 51 +++--
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../ItRaftCommandLeftInLogUntilRestartTest.java | 9 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 11 +-
.../exec/rel/TableScanNodeExecutionTest.java | 9 -
modules/table/build.gradle | 2 +
.../ItAbstractInternalTableScanTest.java | 59 +++++-
.../ItInternalTableReadOnlyOperationsTest.java | 15 +-
.../ItInternalTableReadWriteScanTest.java | 11 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 -
.../rebalance/ItRebalanceDistributedTest.java | 108 +++++-----
.../ignite/internal/table/ItColocationTest.java | 2 -
.../ignite/internal/table/InternalTable.java | 7 -
.../apache/ignite/internal/table/TableImpl.java | 6 -
.../ignite/internal/table/TableRaftService.java | 52 -----
.../ignite/internal/table/TableViewInternal.java | 10 -
.../internal/table/distributed/TableManager.java | 183 +++++++++--------
.../distributed/storage/InternalTableImpl.java | 15 +-
.../distributed/storage/TableRaftServiceImpl.java | 167 ---------------
.../distributed/TableManagerRecoveryTest.java | 6 +-
.../table/distributed/TableManagerTest.java | 2 +-
.../storage/InternalTableEstimatedSizeTest.java | 2 -
.../distributed/storage/InternalTableImplTest.java | 3 -
.../apache/ignite/distributed/ItTxTestCluster.java | 6 +-
.../ignite/internal/table/TxAbstractTest.java | 8 +-
.../table/impl/DummyInternalTableImpl.java | 21 +-
36 files changed, 661 insertions(+), 535 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 6c276c030c..2789d783dc 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -461,12 +460,6 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
throw new IgniteInternalException(new
OperationNotSupportedException());
}
- @Override
- public TableRaftService tableRaftService() {
- throw new IgniteInternalException(new
OperationNotSupportedException());
- }
-
-
@Override public TxStateTableStorage txStateStorage() {
return null;
}
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 0693fbe8c8..e66f790bcf 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -81,6 +81,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-table'))
integrationTestImplementation testFixtures(project(':ignite-storage-api'))
integrationTestImplementation testFixtures(project(':ignite-catalog'))
+ integrationTestImplementation testFixtures(project(':ignite-replicator'))
integrationTestImplementation libs.jetbrains.annotations
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 50eb3d999f..40e4ebbe97 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -118,6 +118,7 @@ import
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -176,6 +177,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
@@ -304,6 +306,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name),
"cluster");
+ placementDriver.setPrimary(node0.toClusterNode());
+
nodes.values().forEach(Node::waitWatches);
assertThat(
@@ -346,6 +350,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22928")
public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
@@ -1125,8 +1130,9 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
- threadPoolsManager.partitionOperationsExecutor()
- );
+ threadPoolsManager.partitionOperationsExecutor(),
+ clockService,
+ placementDriver);
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
@@ -1280,6 +1286,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
nodeCfgGenerator.close();
clusterCfgGenerator.close();
}
+
+ ClusterNode toClusterNode() {
+ return new ClusterNodeImpl(name, name, networkAddress);
+ }
}
@FunctionalInterface
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index d5b7a54510..529c625c21 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -28,6 +28,8 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener.handleReduceChanged;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
@@ -38,6 +40,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -53,8 +56,8 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,6 +70,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
+import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -78,8 +82,9 @@ import
org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
-import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -98,12 +103,12 @@ import
org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -169,6 +174,15 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
*/
private final Executor partitionOperationsExecutor;
+ /** Clock service for primary replica awaitings. */
+ private final ClockService clockService;
+
+ /** Placement driver for primary replicas checks. */
+ private final PlacementDriver placementDriver;
+
+ /** A predicate that checks that the given assignment is corresponded to
the local node. */
+ private final Predicate<Assignment> isLocalNodeAssignment = assignment ->
assignment.consistentId().equals(localNode().name());
+
/**
* The constructor.
*
@@ -180,6 +194,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
* @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
* will be executed.
+ * @param clockService Clock service.
+ * @param placementDriver Placement driver.
*/
public PartitionReplicaLifecycleManager(
CatalogManager catalogMgr,
@@ -190,7 +206,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
LowWatermark lowWatermark,
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler,
- Executor partitionOperationsExecutor
+ Executor partitionOperationsExecutor,
+ ClockService clockService,
+ PlacementDriver placementDriver
) {
this.catalogMgr = catalogMgr;
this.replicaMgr = replicaMgr;
@@ -201,6 +219,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
this.ioExecutor = ioExecutor;
this.rebalanceScheduler = rebalanceScheduler;
this.partitionOperationsExecutor = partitionOperationsExecutor;
+ this.clockService = clockService;
+
+ this.placementDriver = placementDriver;
pendingAssignmentsRebalanceListener =
createPendingAssignmentsRebalanceListener();
stableAssignmentsRebalanceListener =
createStableAssignmentsRebalanceListener();
@@ -746,7 +767,6 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
? emptySet()
:
Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
-
return supplyAsync(() -> {
Entry pendingAssignmentsEntry =
metaStorageMgr.getLocally(pendingPartAssignmentsKey(zonePartitionId), revision);
@@ -769,13 +789,22 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
ZonePartitionId zonePartitionId,
Set<Assignment> stableAssignments
) {
- // Update raft client peers and learners according to the actual
assignments.
- if (replicaMgr.isReplicaStarted(zonePartitionId)) {
- replicaMgr.replica(zonePartitionId).join()
-
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
- }
+ return isLocalNodeIsPrimary(zonePartitionId).thenCompose(isLeaseholder
-> inBusyLock(busyLock, () -> {
+ boolean isLocalInStable =
isLocalNodeInAssignments(stableAssignments);
- return nullCompletedFuture();
+ if (!isLocalInStable && !isLeaseholder) {
+ return nullCompletedFuture();
+ }
+
+ assert replicaMgr.isReplicaStarted(zonePartitionId)
+ : "The local node is outside of the replication group
[stable=" + stableAssignments
+ + ", isLeaseholder=" + isLeaseholder + "].";
+
+ // Update raft client peers and learners according to the actual
assignments.
+ return replicaMgr.replica(zonePartitionId)
+ .thenApply(Replica::raftClient)
+ .thenAccept(raftClient ->
raftClient.updateConfiguration(fromAssignments(stableAssignments)));
+ }));
}
private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
@@ -796,10 +825,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
.noneMatch(assignment ->
assignment.consistentId().equals(localNode().name()));
if (shouldStopLocalServices) {
- return allOf(
- clientUpdateFuture,
- stopAndDestroyPartition(zonePartitionId)
- );
+ return clientUpdateFuture.thenCompose(v ->
stopAndDestroyPartition(zonePartitionId))
+ .thenAccept(v -> { });
} else {
return clientUpdateFuture;
}
@@ -844,14 +871,26 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return handleChangePendingAssignmentEvent(
zonePartitionId,
stableAssignments,
- pendingAssignments
- ).thenCompose(v -> changePeersOnRebalance(
- replicaMgr,
- zonePartitionId,
- pendingAssignments.nodes(),
- stableAssignments == null ? emptySet() :
stableAssignments.nodes(),
- revision)
- );
+ pendingAssignments,
+ revision
+ ).thenCompose(v -> {
+ boolean isLocalNodeInStableOrPending =
isNodeInReducedStableOrPendingAssignments(
+ zonePartitionId,
+ stableAssignments,
+ pendingAssignments,
+ revision
+ );
+
+ if (!isLocalNodeInStableOrPending) {
+ return nullCompletedFuture();
+ }
+
+ return changePeersOnRebalance(
+ replicaMgr,
+ zonePartitionId,
+ pendingAssignments.nodes(),
+ revision);
+ });
} finally {
busyLock.leaveBusy();
}
@@ -860,7 +899,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
ZonePartitionId replicaGrpId,
@Nullable Assignments stableAssignments,
- Assignments pendingAssignments
+ Assignments pendingAssignments,
+ long revision
) {
boolean pendingAssignmentsAreForced = pendingAssignments.force();
Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -912,41 +952,45 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
localServicesStartFuture = nullCompletedFuture();
}
- return localServicesStartFuture.thenRunAsync(() ->
inBusyLock(busyLock, () -> {
- if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
- return;
- }
+ return localServicesStartFuture
+ .thenComposeAsync(v -> inBusyLock(busyLock, () ->
isLocalNodeIsPrimary(replicaGrpId)), ioExecutor)
+ .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
+ boolean isLocalNodeInStableOrPending =
isNodeInReducedStableOrPendingAssignments(
+ replicaGrpId,
+ stableAssignments,
+ pendingAssignments,
+ revision
+ );
- // For forced assignments, we exclude dead stable nodes, and all
alive stable nodes are already in pending assignments.
- // Union is not required in such a case.
- Set<Assignment> newAssignments = pendingAssignmentsAreForced ||
stableAssignments == null
- ? pendingAssignmentsNodes
- : RebalanceUtil.union(pendingAssignmentsNodes,
stableAssignments.nodes());
+ if (!isLocalNodeInStableOrPending && !isLeaseholder) {
+ return;
+ }
-
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
- }), ioExecutor);
+ assert isLocalNodeInStableOrPending || isLeaseholder
+ : "The local node is outside of the replication
group [inStableOrPending=" + isLocalNodeInStableOrPending
+ + ", isLeaseholder=" + isLeaseholder + "].";
+
+ // For forced assignments, we exclude dead stable nodes,
and all alive stable nodes are already in pending assignments.
+ // Union is not required in such a case.
+ Set<Assignment> newAssignments =
pendingAssignmentsAreForced || stableAssignments == null
+ ? pendingAssignmentsNodes
+ : union(pendingAssignmentsNodes,
stableAssignments.nodes());
+
+ replicaMgr.replica(replicaGrpId)
+ .thenApply(Replica::raftClient)
+ .thenAccept(raftClient ->
raftClient.updateConfiguration(fromAssignments(newAssignments)));
+ }), ioExecutor);
}
private CompletableFuture<Void> changePeersOnRebalance(
ReplicaManager replicaMgr,
ZonePartitionId replicaGrpId,
Set<Assignment> pendingAssignments,
- Set<Assignment> stableAssignments,
long revision
) {
- Set<Assignment> union = new HashSet<>();
- union.addAll(pendingAssignments);
- union.addAll(stableAssignments);
-
- if
(!union.stream().map(Assignment::consistentId).collect(toSet()).contains(localNode().name()))
{
- return nullCompletedFuture();
- }
-
- int partId = replicaGrpId.partitionId();
-
- RaftGroupService partGrpSvc =
replicaMgr.replica(replicaGrpId).join().raftClient();
-
- return partGrpSvc.refreshAndGetLeaderWithTerm()
+ return replicaMgr.replica(replicaGrpId)
+ .thenApply(Replica::raftClient)
+ .thenCompose(raftClient ->
raftClient.refreshAndGetLeaderWithTerm()
.exceptionally(throwable -> {
throwable = unwrapCause(throwable);
@@ -970,7 +1014,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
// run update of raft configuration if this node is a
leader
LOG.info("Current node={} is the leader of partition raft
group={}. "
+ "Initiate rebalance process for
partition={}, zoneId={}",
- leaderWithTerm.leader(), replicaGrpId, partId,
replicaGrpId.zoneId());
+ leaderWithTerm.leader(), replicaGrpId,
replicaGrpId.partitionId(), replicaGrpId.zoneId());
return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
.thenCompose(latestPendingAssignmentsEntry -> {
@@ -983,19 +1027,96 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
- CompletableFuture<Void> voidCompletableFuture
= partGrpSvc.changePeersAndLearnersAsync(newConfiguration,
+ CompletableFuture<Void> voidCompletableFuture
= raftClient.changePeersAndLearnersAsync(newConfiguration,
leaderWithTerm.term()).exceptionally(e
-> {
return null;
});
return voidCompletableFuture;
});
- });
+ }));
}
private boolean isLocalPeer(Peer peer) {
return peer.consistentId().equals(localNode().name());
}
+ private boolean isLocalNodeInAssignments(Collection<Assignment>
assignments) {
+ return assignments.stream().anyMatch(isLocalNodeAssignment);
+ }
+
+ /**
+ * Checks that the local node is primary or not.
+ * <br>
+ * Internally we use there {@link PlacementDriver#getPrimaryReplica} with
a penultimate
+ * safe time value, because metastore is waiting for pending or stable
assignments events handling over and only then metastore will
+ * increment the safe time. On the other hand placement driver internally
is waiting the metastore for given safe time plus
+ * {@link ClockService#maxClockSkewMillis}. So, if given time is just
{@link ClockService#now}, then there is a dead lock: metastore
+ * is waiting until assignments handling is over, but internally placement
driver is waiting for a non-applied safe time.
+ * <br>
+ * To solve this issue we pass to {@link
PlacementDriver#getPrimaryReplica} current time minus the skew, so placement
driver could
+ * successfully get primary replica for the time stamp before the handling
has began. Also there a corner case for tests that are using
+ * {@code WatchListenerInhibitor#metastorageEventsInhibitor} and it leads
to current time equals {@link HybridTimestamp#MIN_VALUE} and
+ * the skew's subtraction will lead to {@link IllegalArgumentException}
from {@link HybridTimestamp}. Then, if we got the minimal
+ * possible timestamp, then we also couldn't have any primary replica,
then return {@code false}.
+ *
+ * @param replicationGroupId Replication group ID for that we check is the
local node a primary.
+ * @return {@code true} is the local node is primary and {@code false}
otherwise.
+ */
+ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId
replicationGroupId) {
+ HybridTimestamp currentSafeTime =
metaStorageMgr.clusterTime().currentSafeTime();
+
+ if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
+ return falseCompletedFuture();
+ }
+
+ long skewMs = clockService.maxClockSkewMillis();
+
+ try {
+ HybridTimestamp previousMetastoreSafeTime =
currentSafeTime.subtractPhysicalTime(skewMs);
+
+ return placementDriver.getPrimaryReplica(replicationGroupId,
previousMetastoreSafeTime)
+ .thenApply(replicaMeta -> replicaMeta != null
+ && replicaMeta.getLeaseholderId() != null
+ &&
replicaMeta.getLeaseholderId().equals(localNode().id()));
+ } catch (IllegalArgumentException e) {
+ long currentSafeTimeMs = currentSafeTime.longValue();
+
+ throw new AssertionError("Got a negative time [currentSafeTime=" +
currentSafeTime
+ + ", currentSafeTimeMs=" + currentSafeTimeMs
+ + ", skewMs=" + skewMs
+ + ", internal=" + (currentSafeTimeMs + ((-skewMs) <<
LOGICAL_TIME_BITS_SIZE)) + "]", e);
+ }
+ }
+
+ private boolean isNodeInReducedStableOrPendingAssignments(
+ ZonePartitionId replicaGrpId,
+ @Nullable Assignments stableAssignments,
+ Assignments pendingAssignments,
+ long revision
+ ) {
+ Entry reduceEntry =
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId),
revision);
+
+ Assignments reduceAssignments = reduceEntry != null
+ ? Assignments.fromBytes(reduceEntry.value())
+ : null;
+
+ Set<Assignment> reducedStableAssignments = reduceAssignments != null
+ ? subtract(stableAssignments.nodes(),
reduceAssignments.nodes())
+ : stableAssignments.nodes();
+
+ if (!isLocalNodeInAssignments(union(reducedStableAssignments,
pendingAssignments.nodes()))) {
+ return false;
+ }
+
+ assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is
outside of the replication group ["
+ + ", stable=" + stableAssignments
+ + ", pending=" + pendingAssignments
+ + ", reduce=" + reduceAssignments
+ + ", localName=" + localNode().name() + "].";
+
+ return true;
+ }
+
@Nullable
private Assignment localMemberAssignment(Assignments assignments) {
Assignment localMemberAssignment =
Assignment.forPeer(localNode().name());
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index c6523efc82..269d926085 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -154,13 +155,14 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
@Test
public void testPrimaryChangeLongHandling() throws Exception {
- TableViewInternal tbl =
unwrapTableImpl(node(0).tables().table(TABLE_NAME));
+ IgniteImpl node = node(0);
+ TableViewInternal tbl =
unwrapTableImpl(node.tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
node.placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now(),
+ node.clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
@@ -184,7 +186,7 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() ->
NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
- waitingForLeaderCache(tbl, primary);
+ waitingForLeaderCache(node, tbl);
assertFalse(primaryChangeTask.isDone());
@@ -399,12 +401,13 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
/**
* Waits when the leader would be a different with the current primary
replica.
*
+ * @param node Ignite node.
* @param tbl Table.
- * @param primary Current primary replica name.
* @throws InterruptedException If fail.
*/
- private static void waitingForLeaderCache(TableViewInternal tbl, String
primary) throws InterruptedException {
- RaftGroupService raftSrvc =
tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
+ private static void waitingForLeaderCache(IgniteImpl node,
TableViewInternal tbl) throws InterruptedException {
+ RaftGroupService raftSrvc = ReplicaTestUtils.getRaftClient(node,
tbl.tableId(), 0)
+ .orElseThrow(AssertionError::new);
assertTrue(waitForCondition(() -> {
raftSrvc.refreshLeader();
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 51f54bd185..5b88e751bd 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -528,7 +528,6 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
return replicaManager.startReplica(
groupId,
newConfiguration,
- (unused) -> { },
(unused) -> listener,
new
PendingComparableValuesTracker<>(Long.MAX_VALUE),
completedFuture(raftClient));
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 344b966318..566a01ae0a 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -53,7 +53,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@@ -578,7 +577,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
SnapshotStorageFactory snapshotStorageFactory,
- Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
TablePartitionId replicaGrpId,
@@ -604,7 +602,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return startReplica(
replicaGrpId,
newConfiguration,
- updateTableRaftService,
createListener,
storageIndexTracker,
newRaftClientFut
@@ -618,8 +615,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param raftGroupListener Raft group listener for raft group starting.
* @param isVolatileStorage is table storage volatile?
* @param snapshotStorageFactory Snapshot storage factory for raft group
option's parameterization.
- * @param updateTableRaftService Temporal consumer while TableRaftService
wouldn't be removed in
- * TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
* @param createListener Due to creation of ReplicaListener in
TableManager, the function returns desired listener by created
* raft-client inside {@link #startReplica} method.
* @param replicaGrpId Replication group id.
@@ -633,7 +628,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
SnapshotStorageFactory snapshotStorageFactory,
- Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
TablePartitionId replicaGrpId,
@@ -649,7 +643,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
raftGroupListener,
isVolatileStorage,
snapshotStorageFactory,
- updateTableRaftService,
createListener,
storageIndexTracker,
replicaGrpId,
@@ -745,8 +738,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*
* @param replicaGrpId Replication group id.
* @param newConfiguration Peers and Learners of the Raft group.
- * @param updateTableRaftService A temporal clojure that updates table
raft service with new raft-client, but
- * TODO: will be removed
https://issues.apache.org/jira/browse/IGNITE-22218
* @param createListener A clojure that returns done {@link
ReplicaListener} by given raft-client {@link RaftGroupService}.
* @param storageIndexTracker Storage index tracker.
* @param newRaftClientFut A future that returns created raft-client.
@@ -758,7 +749,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
PeersAndLearners newConfiguration,
- Consumer<RaftGroupService> updateTableRaftService,
Function<RaftGroupService, ReplicaListener> createListener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut
@@ -766,11 +756,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
return newRaftClientFut
- .thenApplyAsync(raftClient -> {
- // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22218
- updateTableRaftService.accept(raftClient);
- return createListener.apply(raftClient);
- }, replicasCreationExecutor)
+ .thenApplyAsync(createListener, replicasCreationExecutor)
.thenCompose(replicaListener -> startReplica(replicaGrpId,
storageIndexTracker, completedFuture(replicaListener)));
}
@@ -1147,6 +1133,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param replicaGrpId Replication group id.
* @return True if the replica is started.
*/
+ @TestOnly
+ @VisibleForTesting
+ @Deprecated
public boolean isReplicaStarted(ReplicationGroupId replicaGrpId) {
CompletableFuture<Replica> replicaFuture = replicas.get(replicaGrpId);
return replicaFuture != null && isCompletedSuccessfully(replicaFuture);
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index c7999187c5..380234b89d 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -183,7 +183,6 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
CompletableFuture<Replica> startReplicaFuture =
replicaManager.startReplica(
groupId,
newConfiguration,
- (unused) -> { },
(unused) -> replicaListener,
new PendingComparableValuesTracker<>(0L),
completedFuture(raftGroupService)
diff --git
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
new file mode 100644
index 0000000000..8a063a3270
--- /dev/null
+++
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.TestOnly;
+
+/** Utilities for working with replicas and replicas manager in tests. */
+public final class ReplicaTestUtils {
+ /**
+ * Returns raft-client if exists.
+ *
+ * @param node Ignite node that hosts the raft-client.
+ * @param tableId Desired table's ID.
+ * @param partId Desired partition's ID.
+ *
+ * @return Optional with raft-client if exists on the node by given
identifiers.
+ */
+ @TestOnly
+ public static Optional<RaftGroupService> getRaftClient(Ignite node, int
tableId, int partId) {
+ return getRaftClient(getReplicaManager(node), tableId, partId);
+ }
+
+ /**
+ * Returns raft-client if exists.
+ *
+ * @param replicaManager Ignite node's replica manager with replica that
should contains a raft client.
+ * @param tableId Desired table's ID.
+ * @param partId Desired partition's ID.
+ *
+ * @return Optional with raft-client if exists on the node by given
identifiers.
+ */
+ @TestOnly
+ public static Optional<RaftGroupService> getRaftClient(ReplicaManager
replicaManager, int tableId, int partId) {
+ CompletableFuture<Replica> replicaFut = replicaManager
+ .replica(new TablePartitionId(tableId, partId));
+
+ if (replicaFut == null) {
+ return Optional.empty();
+ }
+
+ try {
+ return Optional.of(replicaFut.get(15,
TimeUnit.SECONDS).raftClient());
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Extracts {@link ReplicaManager} from the given {@link Ignite} node.
+ *
+ * @param node The given node with desired replica manager.
+ *
+ * @return Replica manager component from given node.
+ */
+ @TestOnly
+ public static ReplicaManager getReplicaManager(Ignite node) {
+ return IgniteTestUtils.getFieldValue(node, "replicaMgr");
+ }
+
+ /**
+ * Extracts {@link TopologyService} from the given {@link Ignite} node.
+ *
+ * @param node The given node with desired topology service.
+ *
+ * @return Topology service component from given node.
+ */
+ @TestOnly
+ private static TopologyService getTopologyService(Ignite node) {
+ ClusterService clusterService = IgniteTestUtils.getFieldValue(node,
"clusterSvc");
+ return clusterService.topologyService();
+ }
+
+ /**
+ * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if it cannot be found.
+ *
+ * @param node Ignite node with raft client.
+ * @param tableId Table identifier.
+ * @param partId Partition number.
+ *
+ * @return Leader node of the partition group corresponding to the
partition
+ */
+ @TestOnly
+ public static ClusterNode leaderAssignment(Ignite node, int tableId, int
partId) {
+ return leaderAssignment(getReplicaManager(node),
getTopologyService(node), tableId, partId);
+ }
+
+ /**
+ * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if it cannot be found.
+ *
+ * @param replicaManager Ignite node's replica manager with replica that
should contains a raft client.
+ * @param topologyService Ignite node's topology service that should find
and return leader cluster node.
+ * @param tableId Table identifier.
+ * @param partId Partition number.
+ *
+ * @return Leader node of the partition group corresponding to the
partition
+ */
+ @TestOnly
+ public static ClusterNode leaderAssignment(ReplicaManager replicaManager,
TopologyService topologyService, int tableId, int partId) {
+ RaftGroupService raftClient = getRaftClient(replicaManager, tableId,
partId)
+ .orElseThrow(() -> new IgniteInternalException("No such
partition " + partId + " in table " + tableId));
+
+ if (raftClient.leader() == null) {
+ try {
+ raftClient.refreshLeader().get(15, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new IgniteInternalException("Couldn't get a leader for
partition " + partId + " in table " + tableId, e);
+ }
+ }
+
+ return
topologyService.getByConsistentId(raftClient.leader().consistentId());
+ }
+}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5e66356dc3..8ad1bc82d8 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -183,6 +183,7 @@ dependencies {
integrationTestImplementation
testFixtures(project(':ignite-failure-handler'))
integrationTestImplementation testFixtures(project(':ignite-metrics:'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
+ integrationTestImplementation testFixtures(project(':ignite-replicator'))
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.rocksdb.jni
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 4e479fbe90..4ec569d59d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -30,11 +30,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.IntFunction;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
@@ -42,8 +48,9 @@ import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.TableViewInternal;
@@ -171,9 +178,10 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
TableViewInternal table =
unwrapTableViewInternal(ignite.tables().table(TABLE_NAME));
// Find the leader of the table's partition group.
- RaftGroupService raftGroupService =
table.internalTable().tableRaftService().partitionRaftGroupService(0);
- LeaderWithTerm leaderWithTerm =
raftGroupService.refreshAndGetLeaderWithTerm().join();
- String leaderId = leaderWithTerm.leader().consistentId();
+ String leaderId = ignite.replicaManager()
+ .replica(new TablePartitionId(table.tableId(), 0))
+ .thenApply(replica ->
replica.raftClient().leader().consistentId())
+ .get(15, TimeUnit.SECONDS);
log.info("Leader is {}", leaderId);
@@ -196,17 +204,17 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
String restartingNodeConsistentId = restartingNode.name();
TableViewInternal restartingTable =
unwrapTableViewInternal(restartingNode.tables().table(TABLE_NAME));
- InternalTableImpl internalTable = (InternalTableImpl)
restartingTable.internalTable();
+ InternalTableImpl restartingInternalTable = (InternalTableImpl)
restartingTable.internalTable();
// Check that it restarts.
waitForCondition(
- () -> isRaftNodeStarted(table, loza) &&
solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+ () -> isRaftNodeStarted(table, loza) &&
solePartitionAssignmentsContain(restartingNode, restartingInternalTable, 0),
TimeUnit.SECONDS.toMillis(10)
);
assertTrue(isRaftNodeStarted(table, loza), "Raft node of the partition
is not started on " + restartingNodeConsistentId);
assertTrue(
- solePartitionAssignmentsContain(restartingNodeConsistentId,
internalTable),
+ solePartitionAssignmentsContain(restartingNode,
restartingInternalTable, 0),
"Assignments do not contain node " + restartingNodeConsistentId
);
@@ -214,14 +222,29 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
checkTableWithData(restartingNode, TABLE_NAME);
}
- private static boolean solePartitionAssignmentsContain(String
restartingNodeConsistentId, InternalTableImpl internalTable) {
- Map<Integer, List<String>> assignments =
internalTable.tableRaftService().peersAndLearners();
+ private static boolean solePartitionAssignmentsContain(IgniteImpl
restartingNode, InternalTableImpl table, int partId) {
+ String restartingNodeConsistentId = restartingNode.name();
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), partId);
- List<String> partitionAssignments = assignments.get(0);
+ CompletableFuture<Replica> replicaFut =
restartingNode.replicaManager().replica(tablePartitionId);
- return !assignments.isEmpty()
- && partitionAssignments != null
- && partitionAssignments.contains(restartingNodeConsistentId);
+ if (replicaFut == null) {
+ return false;
+ }
+
+ try {
+ RaftGroupService raftClient = replicaFut.get(15,
TimeUnit.SECONDS).raftClient();
+
+ return Stream.of(raftClient.peers(), raftClient.learners())
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(Peer::consistentId)
+ .collect(Collectors.toSet())
+ .contains(restartingNodeConsistentId);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ return false;
+ }
}
private static boolean isRaftNodeStarted(TableViewInternal table, Loza
loza) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 159702e537..c06c0deff5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -679,7 +679,9 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
- threadPoolsManager.partitionOperationsExecutor()
+ threadPoolsManager.partitionOperationsExecutor(),
+ clockService,
+ placementDriverManager.placementDriver()
)
);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 51b1135e20..b1ee929d55 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -170,7 +171,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
TableViewInternal table = (TableViewInternal)
createTable(DEFAULT_TABLE_NAME, 2, 1);
- ClusterNode leader =
table.internalTable().tableRaftService().leaderAssignment(0);
+ ClusterNode leader = ReplicaTestUtils.leaderAssignment(node0,
table.tableId(), 0);
boolean isNode0Leader = node0.id().equals(leader.id());
@@ -189,7 +190,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
assertTrue(IgniteTestUtils.waitForCondition(() ->
appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
- RaftGroupService raftGroupService =
table.internalTable().tableRaftService().partitionRaftGroupService(0);
+ RaftGroupService raftGroupService =
ReplicaTestUtils.getRaftClient(node0, table.tableId(), 0)
+ .orElseThrow(AssertionError::new);
raftGroupService.peers().forEach(peer ->
assertThat(raftGroupService.snapshot(peer), willCompleteSuccessfully()));
@@ -341,7 +343,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
private void transferLeadershipToLocalNode(IgniteImpl ignite) {
TableViewInternal table = (TableViewInternal)
ignite.tables().table(DEFAULT_TABLE_NAME);
- RaftGroupService raftGroupService =
table.internalTable().tableRaftService().partitionRaftGroupService(0);
+ RaftGroupService raftGroupService =
ReplicaTestUtils.getRaftClient(ignite, table.tableId(), 0)
+ .orElseThrow(AssertionError::new);
List<Peer> peers = raftGroupService.peers();
assertNotNull(peers);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 28bcdc769e..47ce9163ca 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -784,7 +784,9 @@ public class IgniteImpl implements Ignite {
lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
- threadPoolsManager.partitionOperationsExecutor()
+ threadPoolsManager.partitionOperationsExecutor(),
+ clockService,
+ placementDriverMgr.placementDriver()
);
TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
@@ -1692,4 +1694,11 @@ public class IgniteImpl implements Ignite {
public LowWatermarkImpl lowWatermark() {
return lowWatermark;
}
+
+ /** Returns replicas manager. */
+ @TestOnly
+ public ReplicaManager replicaManager() {
+ return replicaMgr;
+ }
+
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 244b81ae94..eb6a1cec8d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
@@ -56,7 +55,6 @@ import
org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -76,7 +74,6 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -253,12 +250,6 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
mock(HybridClock.class),
timestampTracker,
mock(PlacementDriver.class),
- new TableRaftServiceImpl(
- "test",
- PART_CNT,
- Int2ObjectMaps.singleton(0,
mock(RaftGroupService.class)),
- new
SingleClusterNodeResolver(mock(ClusterNode.class))
- ),
mock(TransactionInflights.class),
3_000,
0,
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index ba33cc2bed..483949f30a 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -116,6 +116,7 @@ dependencies {
testFixturesImplementation(testFixtures(project(':ignite-failure-handler')))
testFixturesImplementation(testFixtures(project(':ignite-metrics')))
testFixturesImplementation(testFixtures(project(':ignite-raft')))
+ testFixturesImplementation(testFixtures(project(':ignite-replicator')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.fastutil.core
testFixturesImplementation libs.mockito.core
@@ -167,6 +168,7 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-failure-handler')))
integrationTestImplementation(testFixtures(project(':ignite-metrics')))
integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
+ integrationTestImplementation(testFixtures(project(':ignite-replicator')))
integrationTestImplementation libs.fastutil.core
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.calcite.core
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 0049db71ef..12572ef4e2 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -45,7 +45,12 @@ import java.util.concurrent.atomic.AtomicReference;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -63,6 +68,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -95,13 +101,44 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
/** Internal table to test. */
DummyInternalTableImpl internalTbl;
+ PlacementDriver placementDriver;
+
+ ClusterNodeResolver clusterNodeResolver;
+
/**
* Prepare test environment using DummyInternalTableImpl and Mocked
storage.
*/
@BeforeEach
public void setUp(TestInfo testInfo) {
- internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class),
mockStorage, ROW_SCHEMA, txConfiguration,
- storageUpdateConfiguration);
+ clusterNodeResolver = new ClusterNodeResolver() {
+
+ private final ClusterNode singleNode =
DummyInternalTableImpl.LOCAL_NODE;
+
+ @Override
+ public @Nullable ClusterNode getByConsistentId(String
consistentId) {
+ return singleNode.name().equals(consistentId)
+ ? singleNode
+ : null;
+ }
+
+ @Override
+ public @Nullable ClusterNode getById(String id) {
+ return singleNode.id().equals(id)
+ ? singleNode
+ : null;
+ }
+ };
+
+ placementDriver = new
TestPlacementDriver(DummyInternalTableImpl.LOCAL_NODE);
+
+ internalTbl = new DummyInternalTableImpl(
+ mock(ReplicaService.class),
+ placementDriver,
+ mockStorage,
+ ROW_SCHEMA,
+ txConfiguration,
+ storageUpdateConfiguration
+ );
}
/**
@@ -515,6 +552,24 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
);
}
+ /**
+ * Resolves primary replica node for given replication group.
+ *
+ * @param replicationGroupId Desired replication group ID.
+ * @return Primary replica {@link ClusterNode} for the group.
+ */
+ protected ClusterNode getPrimaryReplica(ReplicationGroupId
replicationGroupId) {
+ return placementDriver.awaitPrimaryReplica(
+ replicationGroupId,
+ DummyInternalTableImpl.CLOCK.now(),
+ DummyInternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ TimeUnit.SECONDS
+ )
+ .thenApply(ReplicaMeta::getLeaseholder)
+ .thenApply(clusterNodeResolver::getByConsistentId)
+ .join();
+ }
+
/**
* Either read-write or read-only publisher producer.
*
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 6ca4986d8c..20553e25a8 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -115,12 +117,23 @@ public class ItInternalTableReadOnlyOperationsTest
extends IgniteAbstractTest {
@Mock
private BinaryRowEx someRow;
+ private PlacementDriver placementDriver;
+
/**
* Prepare test environment using DummyInternalTableImpl and Mocked
storage.
*/
@BeforeEach
public void setUp(TestInfo testInfo) {
- internalTbl = new DummyInternalTableImpl(replicaService, mockStorage,
SCHEMA, txConfiguration, storageUpdateConfiguration);
+ placementDriver = new
TestPlacementDriver(DummyInternalTableImpl.LOCAL_NODE);
+
+ internalTbl = new DummyInternalTableImpl(
+ replicaService,
+ placementDriver,
+ mockStorage,
+ SCHEMA,
+ txConfiguration,
+ storageUpdateConfiguration
+ );
lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
lenient().when(readOnlyTx.readTimestamp()).thenReturn(CLOCK.now());
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 99cd51a6ec..5fba2bd011 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -21,12 +21,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -76,11 +74,14 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
InternalTransaction tx =
internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER);
TablePartitionId tblPartId = new
TablePartitionId(internalTbl.tableId(), ((TablePartitionId)
internalTbl.groupId()).partitionId());
- RaftGroupService raftSvc =
internalTbl.tableRaftService().partitionRaftGroupService(tblPartId.partitionId());
- long term =
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+ long term = 1L;
tx.assignCommitPartition(tblPartId);
- tx.enlist(tblPartId, new
IgniteBiTuple<>(internalTbl.tableRaftService().leaderAssignment(tblPartId.partitionId()),
term));
+
+ ClusterNode primaryReplicaNode = getPrimaryReplica(tblPartId);
+
+ tx.enlist(tblPartId, new IgniteBiTuple<>(primaryReplicaNode, term));
return tx;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 0ada927361..aae2ca7d88 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -245,7 +245,6 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
replicaManager.startReplica(
tablePartitionId,
newConfiguration,
- (unused) -> { },
(unused) -> listener,
new PendingComparableValuesTracker<>(0L),
completedFuture(mock(TopologyAwareRaftGroupService.class))
@@ -363,7 +362,6 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
replicaManager.startReplica(
tablePartitionId,
newConfiguration,
- (unused) -> { },
(unused) -> listener,
new PendingComparableValuesTracker<>(0L),
completedFuture(mock(TopologyAwareRaftGroupService.class))
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index fcc7294998..c5abadb475 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -156,19 +156,20 @@ import
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMe
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -188,7 +189,6 @@ import
org.apache.ignite.internal.storage.pagememory.configuration.schema.Persis
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineExtensionConfigurationSchema;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -359,6 +359,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ electPrimaryReplica(node);
+
alterZone(node, ZONE_NAME, 2);
waitPartitionAssignmentsSyncedToExpected(0, 2);
@@ -378,6 +380,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ electPrimaryReplica(node);
+
alterZone(node, ZONE_NAME, 2);
waitPartitionAssignmentsSyncedToExpected(TABLE_NAME, 0, 2);
@@ -399,6 +403,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ electPrimaryReplica(node);
+
alterZone(node, ZONE_NAME, 2);
alterZone(node, ZONE_NAME, 3);
@@ -417,6 +423,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ electPrimaryReplica(node);
+
alterZone(node, ZONE_NAME, 2);
alterZone(node, ZONE_NAME, 3);
alterZone(node, ZONE_NAME, 2);
@@ -442,21 +450,31 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 2);
+ electPrimaryReplica(node0);
+
Set<String> partitionNodesConsistentIds =
getPartitionClusterNodes(node0, 0).stream()
.map(Assignment::consistentId)
.collect(toSet());
Node newNode = nodes.stream().filter(n ->
!partitionNodesConsistentIds.contains(n.name)).findFirst().orElseThrow();
- Node leaderNode =
findNodeByConsistentId(table.tableRaftService().leaderAssignment(0).name());
+ ClusterNode leaderClusterNode = ReplicaTestUtils.leaderAssignment(
+ node1.replicaManager,
+ node1.clusterService.topologyService(),
+ table.tableId(),
+ 0
+ );
+
+ Node leaderNode = findNodeByConsistentId(leaderClusterNode.name());
String nonLeaderNodeConsistentId = partitionNodesConsistentIds.stream()
.filter(n -> !n.equals(leaderNode.name))
.findFirst()
.orElseThrow();
- TableViewInternal nonLeaderTable =
- (TableViewInternal)
findNodeByConsistentId(nonLeaderNodeConsistentId).tableManager.table(TABLE_NAME);
+ Node nonLeaderNode = findNodeByConsistentId(nonLeaderNodeConsistentId);
+
+ TableViewInternal nonLeaderTable = (TableViewInternal)
nonLeaderNode.tableManager.table(TABLE_NAME);
var countDownLatch = new CountDownLatch(1);
@@ -481,10 +499,10 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(countDownLatch.await(10, SECONDS));
- TableRaftService tableRaftService =
nonLeaderTable.internalTable().tableRaftService();
-
assertThat(
-
tableRaftService.partitionRaftGroupService(0).transferLeadership(new
Peer(nonLeaderNodeConsistentId)),
+ ReplicaTestUtils.getRaftClient(nonLeaderNode.replicaManager,
nonLeaderTable.tableId(), 0)
+ .map(raftClient -> raftClient.transferLeadership(new
Peer(nonLeaderNodeConsistentId)))
+ .orElse(null),
willCompleteSuccessfully()
);
@@ -509,6 +527,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
+ electPrimaryReplica(node);
+
JraftServerImpl raftServer = (JraftServerImpl) nodes.stream()
.filter(n -> n.raftManager.localNodes().stream().anyMatch(grp
-> grp.toString().contains("_part_")))
.findFirst()
@@ -551,6 +571,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 2);
+ electPrimaryReplica(node);
+
Set<Assignment> assignmentsAfterChangeReplicas =
getPartitionClusterNodes(node, 0);
Set<Assignment> evictedAssignments =
getEvictedAssignments(assignmentsBeforeChangeReplicas,
assignmentsAfterChangeReplicas);
@@ -653,6 +675,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
+ electPrimaryReplica(node);
+
alterZone(node, ZONE_NAME, 3);
waitPartitionAssignmentsSyncedToExpected(0, 3);
@@ -676,6 +700,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
+ electPrimaryReplica(node);
+
Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
String newNodeNameForAssignment = nodes.stream()
@@ -742,6 +768,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
+ electPrimaryReplica(node);
+
var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0);
String nodeNameAssignedBeforeRebalance =
assignmentsBeforeRebalance.stream()
.findFirst()
@@ -813,22 +841,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
private static boolean isNodeUpdatesPeersOnGroupService(Node node,
Set<Peer> desiredPeers) {
- // TODO: will be replaced with replica usage in
https://issues.apache.org/jira/browse/IGNITE-22218
- TableRaftService tblRaftSvc = node.tableManager.startedTables()
- .get(getTableId(node, TABLE_NAME))
- .internalTable()
- .tableRaftService();
- RaftGroupService groupService;
- try {
- groupService = tblRaftSvc.partitionRaftGroupService(0);
- } catch (IgniteInternalException e) {
- return false;
- }
- List<Peer> peersList = groupService.peers();
- boolean isUpdated = peersList.stream()
- .collect(toSet())
- .equals(desiredPeers);
- return isUpdated;
+ return ReplicaTestUtils.getRaftClient(node.replicaManager,
getTableId(node, TABLE_NAME), 0)
+ .map(raftClient ->
raftClient.peers().stream().collect(toSet()).equals(desiredPeers))
+ .orElse(false);
}
private void clearSpyInvocations() {
@@ -899,28 +914,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Node anyNode = nodes.get(0);
Set<Assignment> assignments = getPartitionClusterNodes(anyNode,
tableName, replicasNum);
assertTrue(waitForCondition(
- () -> {
- try {
- return nodes.stream()
- .filter(n -> isNodeInAssignments(n,
assignments))
- .allMatch(n -> {
- // TODO: will be replaced with replica
usage in https://issues.apache.org/jira/browse/IGNITE-22218
- TableRaftService trs = n.tableManager
- .cachedTable(getTableId(n,
tableName))
- .internalTable()
- .tableRaftService();
-
- try {
- return
trs.partitionRaftGroupService(partNum) != null;
- } catch (IgniteInternalException e) {
- return false;
- }
- });
- } catch (IgniteInternalException e) {
- // Raft group service not found.
- return false;
- }
- },
+ () -> nodes.stream()
+ .filter(n -> isNodeInAssignments(n, assignments))
+ .allMatch(n ->
ReplicaTestUtils.getRaftClient(n.replicaManager, getTableId(n, tableName),
partNum).isPresent()),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
}
@@ -947,6 +943,22 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
return IntStream.range(0, nodes.size()).filter(i ->
getNode(i).name.equals(consistentId)).findFirst().orElseThrow();
}
+ private void electPrimaryReplica(Node node) {
+ Set<Assignment> assignments = getPartitionClusterNodes(node, 0);
+
+ String leaseholderConsistentId =
assignments.stream().findFirst().get().consistentId();
+
+ ClusterNode leaseholder = nodes.stream()
+ .filter(n ->
n.clusterService.topologyService().localMember().name().equals(leaseholderConsistentId))
+ .findFirst()
+ .get()
+ .clusterService
+ .topologyService()
+ .localMember();
+
+ node.placementDriver.setPrimaryReplicaSupplier(() -> new
TestReplicaMetaImpl(leaseholder.name(), leaseholder.id()));
+ }
+
private static Set<Assignment> getPartitionClusterNodes(Node node, int
partNum) {
return getPartitionClusterNodes(node, TABLE_NAME, partNum);
}
@@ -1360,7 +1372,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler,
- threadPoolsManager.partitionOperationsExecutor()
+ threadPoolsManager.partitionOperationsExecutor(),
+ clockService,
+ placementDriver
)
) {
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 5302727ff0..2cab2d6cc5 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -96,7 +96,6 @@ import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -295,7 +294,6 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new HybridClockImpl(),
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
- new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new
SingleClusterNodeResolver(clusterNode)),
transactionInflights,
3_000,
0,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 51189f5c6f..ee381cd859 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -449,13 +449,6 @@ public interface InternalTable extends ManuallyCloseable {
*/
TxStateTableStorage txStateStorage();
- /**
- * Raft service for this table.
- *
- * @return Table raft service.
- */
- TableRaftService tableRaftService();
-
// TODO: IGNITE-14488. Add invoke() methods.
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index e8f874d37b..aae27fa43a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -43,7 +43,6 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -210,11 +209,6 @@ public class TableImpl implements TableViewInternal {
return tbl.partition(keyRow);
}
- @Override
- public ClusterNode leaderAssignment(int partition) {
- return tbl.tableRaftService().leaderAssignment(partition);
- }
-
/** Returns a supplier of index storage wrapper factories for given
partition. */
public TableIndexStoragesSupplier indexStorageAdapters(int partId) {
return () -> {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
deleted file mode 100644
index 8915cfe24e..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table;
-
-import org.apache.ignite.internal.close.ManuallyCloseable;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.network.ClusterNode;
-
-/**
- * Internal facade that provides methods for table's raft operations.
- */
-public interface TableRaftService extends ManuallyCloseable {
-
- /**
- * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if it cannot be found.
- *
- * @param partition partition number
- * @return leader node of the partition group corresponding to the
partition
- */
- ClusterNode leaderAssignment(int partition);
-
- /**
- * Returns raft group client for corresponding partition.
- *
- * @param partition partition number
- * @return raft group client for corresponding partition
- * @throws IgniteInternalException if partition can't be found.
- */
- RaftGroupService partitionRaftGroupService(int partition);
-
- /**
- * Closes the service.
- */
- @Override
- void close();
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 56878a266e..ceea5cadd1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -22,7 +22,6 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.table.distributed.PartitionSet;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -77,15 +76,6 @@ public interface TableViewInternal extends Table {
*/
<K> int partition(K key, Mapper<K> keyMapper);
- /**
- * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if
- * it cannot be found.
- *
- * @param partition Partition number.
- * @return Leader node of the partition group corresponding to the
partition.
- */
- ClusterNode leaderAssignment(int partition);
-
/**
* Registers the index with given id in a table.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 656a6381a5..613f457ca3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -42,6 +42,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -62,7 +63,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -194,7 +194,6 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
-import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import
org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -1038,11 +1037,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
- // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22218
- Consumer<RaftGroupService> updateTableRaftService = raftClient ->
internalTbl
- .tableRaftService()
- .updateInternalTableRaftGroupService(partId, raftClient);
-
CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
replicaGrpId,
@@ -1120,7 +1114,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
raftGroupListener,
mvTableStorage.isVolatile(),
snapshotStorageFactory,
- updateTableRaftService,
createListener,
storageIndexTracker,
replicaGrpId,
@@ -1243,6 +1236,50 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return assignments.stream().anyMatch(isLocalNodeAssignment);
}
+ /**
+ * Checks that the local node is primary or not.
+ * <br>
+ * Internally we use there {@link PlacementDriver#getPrimaryReplica} with
a penultimate
+ * safe time value, because metastore is waiting for pending or stable
assignments events handling over and only then metastore will
+ * increment the safe time. On the other hand placement driver internally
is waiting the metastore for given safe time plus
+ * {@link ClockService#maxClockSkewMillis}. So, if given time is just
{@link ClockService#now}, then there is a dead lock: metastore
+ * is waiting until assignments handling is over, but internally placement
driver is waiting for a non-applied safe time.
+ * <br>
+ * To solve this issue we pass to {@link
PlacementDriver#getPrimaryReplica} current time minus the skew, so placement
driver could
+ * successfully get primary replica for the time stamp before the handling
has began. Also there a corner case for tests that are using
+ * {@code WatchListenerInhibitor#metastorageEventsInhibitor} and it leads
to current time equals {@link HybridTimestamp#MIN_VALUE} and
+ * the skew's subtraction will lead to {@link IllegalArgumentException}
from {@link HybridTimestamp}. Then, if we got the minimal
+ * possible timestamp, then we also couldn't have any primary replica,
then return {@code false}.
+ *
+ * @param replicationGroupId Replication group ID for that we check is the
local node a primary.
+ * @return {@code true} is the local node is primary and {@code false}
otherwise.
+ */
+ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId
replicationGroupId) {
+ HybridTimestamp currentSafeTime =
metaStorageMgr.clusterTime().currentSafeTime();
+
+ if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
+ return falseCompletedFuture();
+ }
+
+ long skewMs = clockService.maxClockSkewMillis();
+
+ try {
+ HybridTimestamp previousMetastoreSafeTime =
currentSafeTime.subtractPhysicalTime(skewMs);
+
+ return
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId,
previousMetastoreSafeTime)
+ .thenApply(replicaMeta -> replicaMeta != null
+ && replicaMeta.getLeaseholderId() != null
+ &&
replicaMeta.getLeaseholderId().equals(localNode().id()));
+ } catch (IllegalArgumentException e) {
+ long currentSafeTimeMs = currentSafeTime.longValue();
+
+ throw new AssertionError("Got a negative time [currentSafeTime=" +
currentSafeTime
+ + ", currentSafeTimeMs=" + currentSafeTimeMs
+ + ", skewMs=" + skewMs
+ + ", internal=" + (currentSafeTimeMs + ((-skewMs) <<
LOGICAL_TIME_BITS_SIZE)) + "]", e);
+ }
+ }
+
private PartitionDataStorage partitionDataStorage(MvPartitionStorage
partitionStorage, InternalTable internalTbl, int partId) {
return new SnapshotAwarePartitionDataStorage(
partitionStorage,
@@ -1369,13 +1406,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int partitions = zoneDescriptor.partitions();
- TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
- tableName,
- partitions,
- new Int2ObjectOpenHashMap<>(partitions),
- topologyService
- );
-
InternalTableImpl internalTable = new InternalTableImpl(
tableName,
tableDescriptor.id(),
@@ -1388,7 +1418,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
clock,
observableTimestampTracker,
executorInclinedPlacementDriver,
- tableRaftService,
transactionInflights,
implicitTransactionTimeout,
attemptsObtainLock,
@@ -1965,7 +1994,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return nullCompletedFuture();
}
- return changePeersOnRebalance(table, replicaGrpId,
pendingAssignments.nodes(), revision);
+ return changePeersOnRebalance(replicaGrpId,
pendingAssignments.nodes(), revision);
});
} finally {
busyLock.leaveBusy();
@@ -2059,7 +2088,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
return localServicesStartFuture
- .thenComposeAsync(v -> inBusyLock(busyLock, () ->
isLocalNodeLeaseholder(replicaGrpId)), ioExecutor)
+ .thenComposeAsync(v -> inBusyLock(busyLock, () ->
isLocalNodeIsPrimary(replicaGrpId)), ioExecutor)
.thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
boolean isLocalNodeInStableOrPending =
isNodeInReducedStableOrPendingAssignments(
replicaGrpId,
@@ -2082,10 +2111,9 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
? pendingAssignmentsNodes
: union(pendingAssignmentsNodes,
stableAssignments.nodes());
- tbl.internalTable()
- .tableRaftService()
- .partitionRaftGroupService(partitionId)
-
.updateConfiguration(fromAssignments(newAssignments));
+ replicaMgr.replica(replicaGrpId)
+ .thenApply(Replica::raftClient)
+ .thenAccept(raftClient ->
raftClient.updateConfiguration(fromAssignments(newAssignments)));
}), ioExecutor);
}
@@ -2119,56 +2147,60 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private CompletableFuture<Void> changePeersOnRebalance(
- // TODO: remove excessive argument (used to get raft-client)
https://issues.apache.org/jira/browse/IGNITE-22218
- TableImpl table,
TablePartitionId replicaGrpId,
Set<Assignment> pendingAssignments,
long revision
) {
- int partId = replicaGrpId.partitionId();
-
- RaftGroupService partGrpSvc =
table.internalTable().tableRaftService().partitionRaftGroupService(partId);
+ return replicaMgr.replica(replicaGrpId)
+ .thenApply(Replica::raftClient)
+ .thenCompose(raftClient ->
raftClient.refreshAndGetLeaderWithTerm()
+ .exceptionally(throwable -> {
+ throwable = unwrapCause(throwable);
- return partGrpSvc.refreshAndGetLeaderWithTerm()
- .exceptionally(throwable -> {
- throwable = unwrapCause(throwable);
+ if (throwable instanceof TimeoutException) {
+ LOG.info(
+ "Node couldn't get the leader within
timeout so the changing peers is skipped [grp={}].",
+ replicaGrpId
+ );
- if (throwable instanceof TimeoutException) {
- LOG.info("Node couldn't get the leader within timeout
so the changing peers is skipped [grp={}].", replicaGrpId);
+ return LeaderWithTerm.NO_LEADER;
+ }
- return LeaderWithTerm.NO_LEADER;
- }
+ throw new IgniteInternalException(
+ INTERNAL_ERR,
+ "Failed to get a leader for the RAFT
replication group [get=" + replicaGrpId + "].",
+ throwable
+ );
+ })
+ .thenCompose(leaderWithTerm -> {
+ if (leaderWithTerm.isEmpty() ||
!isLocalPeer(leaderWithTerm.leader())) {
+ return nullCompletedFuture();
+ }
- throw new IgniteInternalException(
- INTERNAL_ERR,
- "Failed to get a leader for the RAFT replication
group [get=" + replicaGrpId + "].",
- throwable
- );
- })
- .thenCompose(leaderWithTerm -> {
- if (leaderWithTerm.isEmpty() ||
!isLocalPeer(leaderWithTerm.leader())) {
- return nullCompletedFuture();
- }
+ // run update of raft configuration if this node
is a leader
+ LOG.info("Current node={} is the leader of
partition raft group={}. "
+ + "Initiate rebalance process for
partition={}, table={}",
+ leaderWithTerm.leader(),
+ replicaGrpId,
+ replicaGrpId.partitionId(),
+ tables.get(replicaGrpId.tableId()).name()
+ );
- // run update of raft configuration if this node is a
leader
- LOG.info("Current node={} is the leader of partition raft
group={}. "
- + "Initiate rebalance process for
partition={}, table={}",
- leaderWithTerm.leader(), replicaGrpId, partId,
tables.get(replicaGrpId.tableId()).name());
-
- return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
- .thenCompose(latestPendingAssignmentsEntry -> {
- // Do not change peers of the raft group if
this is a stale event.
- // Note that we start raft node before for the
sake of the consistency in a
- // starting and stopping raft nodes.
- if (revision <
latestPendingAssignmentsEntry.revision()) {
- return nullCompletedFuture();
- }
+ return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+ .thenCompose(latestPendingAssignmentsEntry
-> {
+ // Do not change peers of the raft
group if this is a stale event.
+ // Note that we start raft node before
for the sake of the consistency in a
+ // starting and stopping raft nodes.
+ if (revision <
latestPendingAssignmentsEntry.revision()) {
+ return nullCompletedFuture();
+ }
- PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
+ PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
- return
partGrpSvc.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term());
- });
- });
+ return
raftClient.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term());
+ });
+ })
+ );
}
private SnapshotStorageFactory createSnapshotStorageFactory(
@@ -2397,23 +2429,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}, ioExecutor).thenCompose(identity());
}
- private CompletableFuture<Boolean>
isLocalNodeLeaseholder(ReplicationGroupId replicationGroupId) {
- HybridTimestamp previousMetastoreSafeTime =
metaStorageMgr.clusterTime()
- .currentSafeTime()
- .addPhysicalTime(-clockService.maxClockSkewMillis());
-
- return
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId,
previousMetastoreSafeTime)
- .thenApply(replicaMeta -> replicaMeta != null
- && replicaMeta.getLeaseholderId() != null
- &&
replicaMeta.getLeaseholderId().equals(localNode().name()));
- }
-
private CompletableFuture<Void> updatePartitionClients(
TablePartitionId tablePartitionId,
- Set<Assignment> stableAssignments,
- long revision
+ Set<Assignment> stableAssignments
) {
- return
isLocalNodeLeaseholder(tablePartitionId).thenCompose(isLeaseholder ->
inBusyLock(busyLock, () -> {
+ return
isLocalNodeIsPrimary(tablePartitionId).thenCompose(isLeaseholder ->
inBusyLock(busyLock, () -> {
boolean isLocalInStable =
isLocalNodeInAssignments(stableAssignments);
if (!isLocalInStable && !isLeaseholder) {
@@ -2421,16 +2441,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
assert replicaMgr.isReplicaStarted(tablePartitionId)
- : "The local node is outside of the replication group
[inStable=" + isLocalInStable
+ : "The local node is outside of the replication group
[stable=" + stableAssignments
+ ", isLeaseholder=" + isLeaseholder + "].";
// Update raft client peers and learners according to the actual
assignments.
- return tablesById(revision).thenAccept(t ->
t.get(tablePartitionId.tableId())
- .internalTable()
- .tableRaftService()
- .partitionRaftGroupService(tablePartitionId.partitionId())
- .updateConfiguration(fromAssignments(stableAssignments))
- );
+ return replicaMgr.replica(tablePartitionId)
+ .thenApply(Replica::raftClient)
+ .thenAccept(raftClient ->
raftClient.updateConfiguration(fromAssignments(stableAssignments)));
}));
}
@@ -2444,7 +2461,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CompletableFuture<Void> clientUpdateFuture = isRecovery
// Updating clients is not needed on recovery.
? nullCompletedFuture()
- : updatePartitionClients(tablePartitionId, stableAssignments,
revision);
+ : updatePartitionClients(tablePartitionId, stableAssignments);
boolean shouldStopLocalServices = (pendingAssignments.force()
? pendingAssignments.nodes().stream()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 715ec99e94..c46521c8f5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -203,9 +203,6 @@ public class InternalTableImpl implements InternalTable {
/** Map update guarded by {@link #updatePartitionMapsMux}. */
private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>>
storageIndexTrackerByPartitionId = emptyMap();
- /** Table raft service. */
- private final TableRaftServiceImpl tableRaftService;
-
/** Implicit transaction timeout. */
private final long implicitTransactionTimeout;
@@ -225,7 +222,6 @@ public class InternalTableImpl implements InternalTable {
* @param replicaSvc Replica service.
* @param clock A hybrid logical clock.
* @param placementDriver Placement driver.
- * @param tableRaftService Table raft service.
* @param transactionInflights Transaction inflights.
* @param implicitTransactionTimeout Implicit transaction timeout.
* @param attemptsObtainLock Attempts to take lock.
@@ -242,7 +238,6 @@ public class InternalTableImpl implements InternalTable {
HybridClock clock,
HybridTimestampTracker observableTimestampTracker,
PlacementDriver placementDriver,
- TableRaftServiceImpl tableRaftService,
TransactionInflights transactionInflights,
long implicitTransactionTimeout,
int attemptsObtainLock,
@@ -260,7 +255,6 @@ public class InternalTableImpl implements InternalTable {
this.clock = clock;
this.observableTimestampTracker = observableTimestampTracker;
this.placementDriver = placementDriver;
- this.tableRaftService = tableRaftService;
this.transactionInflights = transactionInflights;
this.implicitTransactionTimeout = implicitTransactionTimeout;
this.attemptsObtainLock = attemptsObtainLock;
@@ -294,16 +288,9 @@ public class InternalTableImpl implements InternalTable {
@Override
public void name(String newName) {
- tableRaftService.name(newName);
-
this.tableName = newName;
}
- @Override
- public TableRaftServiceImpl tableRaftService() {
- return tableRaftService;
- }
-
/**
* Enlists a single row into a transaction.
*
@@ -2204,7 +2191,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public void close() {
- tableRaftService.close();
+ // No-op
}
// TODO: IGNITE-17963 Use smarter logic for recipient node evaluation.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
deleted file mode 100644
index ac0ba7d8c0..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.storage;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.table.TableRaftService;
-import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.TestOnly;
-
-/**
- * Table's raft client storage.
- */
-public class TableRaftServiceImpl implements TableRaftService {
-
- /** Mutex for the partition maps update. */
- private final Object updatePartitionMapsMux = new Object();
-
- /** Map update guarded by {@link #updatePartitionMapsMux}. */
- private volatile Int2ObjectMap<RaftGroupService>
raftGroupServiceByPartitionId;
-
- /** Resolver that resolves a node consistent ID to cluster node. */
- private final ClusterNodeResolver clusterNodeResolver;
-
- /** Partitions. */
- private final int partitions;
-
- /** Table name. */
- private volatile String tableName;
-
- /**
- * Constructor.
- *
- * @param tableName Table name.
- * @param partitions Partitions.
- * @param partMap Map partition id to raft group.
- * @param clusterNodeResolver Cluster node resolver.
- */
- public TableRaftServiceImpl(
- String tableName,
- int partitions,
- Int2ObjectMap<RaftGroupService> partMap,
- ClusterNodeResolver clusterNodeResolver
- ) {
- this.tableName = tableName;
- this.partitions = partitions;
- this.raftGroupServiceByPartitionId = partMap;
- this.clusterNodeResolver = clusterNodeResolver;
- }
-
- @Override
- public ClusterNode leaderAssignment(int partition) {
- awaitLeaderInitialization();
-
- RaftGroupService raftGroupService =
raftGroupServiceByPartitionId.get(partition);
- if (raftGroupService == null) {
- throw new IgniteInternalException("No such partition " + partition
+ " in table " + tableName);
- }
-
- return
clusterNodeResolver.getByConsistentId(raftGroupService.leader().consistentId());
- }
-
- /** {@inheritDoc} */
- @Override
- public RaftGroupService partitionRaftGroupService(int partition) {
- RaftGroupService raftGroupService =
raftGroupServiceByPartitionId.get(partition);
- if (raftGroupService == null) {
- throw new IgniteInternalException("No such partition " + partition
+ " in table " + tableName);
- }
-
- return raftGroupService;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() {
- for (RaftGroupService srv : raftGroupServiceByPartitionId.values()) {
- srv.shutdown();
- }
- }
-
- public void name(String newName) {
- this.tableName = newName;
- }
-
- /**
- * Updates internal table raft group service for given partition.
- *
- * @param p Partition.
- * @param raftGrpSvc Raft group service.
- */
- public void updateInternalTableRaftGroupService(int p, RaftGroupService
raftGrpSvc) {
- RaftGroupService oldSrvc;
-
- synchronized (updatePartitionMapsMux) {
- Int2ObjectMap<RaftGroupService> newPartitionMap = new
Int2ObjectOpenHashMap<>(partitions);
-
- newPartitionMap.putAll(raftGroupServiceByPartitionId);
-
- oldSrvc = newPartitionMap.put(p, raftGrpSvc);
-
- raftGroupServiceByPartitionId = newPartitionMap;
- }
-
- if (oldSrvc != null && oldSrvc != raftGrpSvc) {
- oldSrvc.shutdown();
- }
- }
-
- /**
- * Returns map of partition -> list of peers and learners of that
partition.
- */
- @TestOnly
- public Map<Integer, List<String>> peersAndLearners() {
- awaitLeaderInitialization();
-
- return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
- .collect(Collectors.toMap(Entry::getIntKey, e -> {
- RaftGroupService service = e.getValue();
- return Stream.of(service.peers(), service.learners())
- .filter(Objects::nonNull)
- .flatMap(Collection::stream)
- .map(Peer::consistentId)
- .collect(Collectors.toList());
- }));
- }
-
- private void awaitLeaderInitialization() {
- List<CompletableFuture<Void>> futs = new ArrayList<>();
-
- for (RaftGroupService raftSvc :
raftGroupServiceByPartitionId.values()) {
- if (raftSvc.leader() == null) {
- futs.add(raftSvc.refreshLeader());
- }
- }
-
- CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)).join();
- }
-}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 89fad7875a..b3a9d13c8c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -286,7 +286,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(topologyService.localMember()).thenReturn(node);
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
- when(replicaMgr.startReplica(any(), any(), any(), any(),
any(PendingComparableValuesTracker.class), any()))
+ when(replicaMgr.startReplica(any(), any(), any(),
any(PendingComparableValuesTracker.class), any()))
.thenReturn(nullCompletedFuture());
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStartReplica(any(), any(),
any())).thenReturn(trueCompletedFuture());
@@ -364,7 +364,9 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
lowWatermark,
ForkJoinPool.commonPool(),
mock(ScheduledExecutorService.class),
- partitionOperationsExecutor
+ partitionOperationsExecutor,
+ clockService,
+ placementDriver
)
) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index bd5238a2fd..0f184e46f8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -294,7 +294,7 @@ public class TableManagerTest extends IgniteAbstractTest {
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
- when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any(), any()))
+ when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any()))
.thenReturn(nullCompletedFuture());
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv
-> {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 02f2146d46..d9d53b08fc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -148,7 +148,6 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
@Mock MvTableStorage tableStorage,
@Mock TxStateTableStorage txStateTableStorage,
@Mock TxStateStorage txStateStorage,
- @Mock TableRaftServiceImpl tableRaftService,
@Mock TransactionStateResolver transactionStateResolver,
@Mock StorageUpdateHandler storageUpdateHandler,
@Mock ValidationSchemasSource validationSchemasSource,
@@ -202,7 +201,6 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
clock,
new HybridTimestampTracker(),
placementDriver,
- tableRaftService,
new TransactionInflights(placementDriver, clockService),
0,
0,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 244687570a..1fef021d82 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -72,7 +71,6 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(HybridClock.class),
new HybridTimestampTracker(),
mock(PlacementDriver.class),
- new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
mock(TransactionInflights.class),
3_000,
0,
@@ -123,7 +121,6 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(HybridClock.class),
new HybridTimestampTracker(),
mock(PlacementDriver.class),
- new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
mock(TransactionInflights.class),
3_000,
0,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index c7c91c4430..9ae2390a08 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -137,7 +137,6 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
@@ -768,7 +767,6 @@ public class ItTxTestCluster {
startClient ? clientClock : clocks.get(localNodeName),
timestampTracker,
placementDriver,
- new TableRaftServiceImpl(tableName, 1, clients,
nodeResolver),
clientTransactionInflights,
500,
0,
@@ -1087,4 +1085,8 @@ public class ItTxTestCluster {
public Map<String, ReplicaManager> replicaManagers() {
return replicaManagers;
}
+
+ public Map<String, ClusterService> clusterServices() {
+ return clusterServices;
+ }
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 1c6a148b13..0cd93893c6 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaImpl;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -1686,7 +1687,12 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
0,
internalTx.id(),
internalTx.readTimestamp(),
- internalTable.tableRaftService().leaderAssignment(0),
+ ReplicaTestUtils.leaderAssignment(
+
txTestCluster.replicaManagers().get(txTestCluster.localNodeName()),
+
txTestCluster.clusterServices().get(txTestCluster.localNodeName()).topologyService(),
+ internalTable.tableId(),
+ 0
+ ),
internalTx.coordinatorId()
)
: internalTable.scan(0, internalTx);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 2d733958a0..5f747189e7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
@@ -99,7 +98,6 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
@@ -173,6 +171,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
) {
this(
replicaSvc,
+ new TestPlacementDriver(LOCAL_NODE),
new TestMvPartitionStorage(0),
schema,
txConfiguration,
@@ -184,6 +183,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
* Creates a new local table.
*
* @param replicaSvc Replica service.
+ * @param placementDriver Placement driver.
* @param storage Storage.
* @param schema Schema.
* @param txConfiguration Transaction configuration.
@@ -191,6 +191,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
*/
public DummyInternalTableImpl(
ReplicaService replicaSvc,
+ PlacementDriver placementDriver,
MvPartitionStorage storage,
SchemaDescriptor schema,
TransactionConfiguration txConfiguration,
@@ -203,11 +204,11 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
null,
schema,
new HybridTimestampTracker(),
- new TestPlacementDriver(LOCAL_NODE),
+ placementDriver,
storageUpdateConfiguration,
txConfiguration,
new RemotelyTriggeredResourceRegistry(),
- new TransactionInflights(new TestPlacementDriver(LOCAL_NODE),
CLOCK_SERVICE)
+ new TransactionInflights(placementDriver, CLOCK_SERVICE)
);
}
@@ -249,12 +250,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
CLOCK,
tracker,
placementDriver,
- new TableRaftServiceImpl(
- "test",
- 1,
- Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
- new SingleClusterNodeResolver(LOCAL_NODE)
- ),
transactionInflights,
3_000,
0,
@@ -262,7 +257,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(StreamerReceiverRunner.class)
);
- RaftGroupService svc =
tableRaftService().partitionRaftGroupService(PART_ID);
+ RaftGroupService svc = mock(RaftGroupService.class);
groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) :
crossTableGroupId;
@@ -388,7 +383,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
replicaListener = new PartitionReplicaListener(
mvPartStorage,
- tableRaftService().partitionRaftGroupService(PART_ID),
+ svc,
this.txManager,
this.txManager.lockManager(),
Runnable::run,
@@ -406,7 +401,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
LOCAL_NODE,
new AlwaysSyncedSchemaSyncService(),
catalogService,
- new TestPlacementDriver(LOCAL_NODE),
+ placementDriver,
mock(ClusterNodeResolver.class),
resourcesRegistry,
schemaManager,