This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8f9b321fee IGNITE-21805 Refactor TableManager and move all RAFT
related pieces to Replica (#3633)
8f9b321fee is described below
commit 8f9b321fee2848757d8174b1d0c330326d21d485
Author: Mikhail Efremov <[email protected]>
AuthorDate: Thu Jun 6 15:47:36 2024 +0600
IGNITE-21805 Refactor TableManager and move all RAFT related pieces to
Replica (#3633)
---
.../rebalance/PartitionMover.java | 9 +-
.../raft/ExecutorInclinedRaftCommandRunner.java | 5 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 3 +-
.../ItPlacementDriverReplicaSideTest.java | 60 +++-
.../apache/ignite/internal/replicator/Replica.java | 9 +-
.../ignite/internal/replicator/ReplicaManager.java | 353 +++++++++++++++-----
.../replicator/listener/ReplicaListener.java | 5 +-
.../replicator/PlacementDriverReplicaSideTest.java | 6 +-
.../internal/replicator/ReplicaManagerTest.java | 37 ++-
.../runner/app/ItIgniteNodeRestartTest.java | 18 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 26 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 74 ++++-
.../ItDisasterRecoveryReconfigurationTest.java | 9 +-
.../rebalance/ItRebalanceDistributedTest.java | 24 +-
.../internal/table/distributed/TableManager.java | 370 ++++++++-------------
.../replicator/PartitionReplicaListener.java | 10 +
.../table/distributed/PartitionMoverTest.java | 9 +-
.../distributed/TableManagerRecoveryTest.java | 14 +-
.../table/distributed/TableManagerTest.java | 27 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 48 ++-
20 files changed, 686 insertions(+), 430 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
index 76275091eb..694e8ed7fe 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
@@ -41,12 +41,12 @@ public class PartitionMover {
private final IgniteSpinBusyLock busyLock;
- private final Supplier<RaftGroupService> raftGroupServiceSupplier;
+ private final Supplier<CompletableFuture<RaftGroupService>>
raftGroupServiceSupplier;
/**
* Constructor.
*/
- public PartitionMover(IgniteSpinBusyLock busyLock,
Supplier<RaftGroupService> raftGroupServiceSupplier) {
+ public PartitionMover(IgniteSpinBusyLock busyLock,
Supplier<CompletableFuture<RaftGroupService>> raftGroupServiceSupplier) {
this.busyLock = busyLock;
this.raftGroupServiceSupplier = raftGroupServiceSupplier;
}
@@ -64,8 +64,9 @@ public class PartitionMover {
}
try {
- return raftGroupServiceSupplier.get()
- .changePeersAsync(peersAndLearners, term)
+ return raftGroupServiceSupplier
+ .get()
+ .thenCompose(raftGroupService ->
raftGroupService.changePeersAsync(peersAndLearners, term))
.handle((resp, err) -> {
if (!busyLock.enterBusy()) {
throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
index 4b16d1131b..da7fb672d3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/ExecutorInclinedRaftCommandRunner.java
@@ -46,4 +46,9 @@ public class ExecutorInclinedRaftCommandRunner implements
RaftCommandRunner {
return future.thenApplyAsync(identity(), completionExecutor);
}
+
+ /** Returns decorated Raft-client. */
+ public RaftCommandRunner decoratedCommandRunner() {
+ return commandRunner;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9d836a27b2..007e9387f4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -717,7 +717,7 @@ public class NodeImpl implements Node, RaftServerService {
electionRound = 0;
if (electionAdjusted) {
- LOG.info("Election timeout was reset to initial value due to
successful leader election.");
+ LOG.info("Election timeout was reset to initial value.");
resetElectionTimeoutMs(initialElectionTimeout);
electionAdjusted = false;
}
@@ -3445,6 +3445,7 @@ public class NodeImpl implements Node, RaftServerService {
this.conf.setConf(newConf);
this.conf.getOldConf().reset();
stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER,
"Raft node set peer normally"));
+ resetElectionTimeoutToInitial();
return Status.OK();
}
finally {
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 5b09cd0e0a..f82741b11d 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
+import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -75,6 +76,7 @@ import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage
import
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.TestLozaFactory;
@@ -83,7 +85,10 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaMessageTestGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -197,7 +202,13 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
Set.of(ReplicaMessageTestGroup.class),
new TestPlacementDriver(primaryReplicaSupplier),
partitionOperationsExecutor,
- new NoOpFailureProcessor()
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ new NoOpFailureProcessor(),
+ // TODO: IGNITE-22222 can't pass
ThreadLocalPartitionCommandsMarshaller there due to dependency loop
+ null,
+ topologyAwareRaftGroupServiceFactory,
+ raftManager,
+ new VolatileLogStorageFactoryCreator(nodeName,
workDir.resolve("volatile-log-spillout"))
);
replicaManagers.put(nodeName, replicaManager);
@@ -220,7 +231,9 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
}
});
- servicesToClose.add(() ->
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS));
+ servicesToClose.addAll(List.of(
+ () ->
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS)
+ ));
}
}
@@ -477,9 +490,11 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
var rftNodeId = new RaftNodeId(groupId, peer);
+ PeersAndLearners newConfiguration = fromConsistentIds(nodes);
+
CompletableFuture<TopologyAwareRaftGroupService> raftClientFut =
raftManager.startRaftGroupNode(
rftNodeId,
- fromConsistentIds(nodes),
+ newConfiguration,
new TestRaftGroupListener(),
RaftGroupEventsListener.noopLsnr,
RaftGroupOptions.defaults(),
@@ -487,24 +502,33 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
);
serviceFutures.add(raftClientFut);
- CompletableFuture<Replica> replicaFuture =
raftClientFut.thenCompose(raftClient -> {
+ CompletableFuture<Boolean> replicaFuture =
raftClientFut.thenCompose(raftClient -> {
try {
+ ReplicaListener listener = new ReplicaListener() {
+ @Override
+ public CompletableFuture<ReplicaResult>
invoke(ReplicaRequest request, String senderId) {
+ log.info("Handle request [type={}]",
request.getClass().getSimpleName());
+
+ return raftClient
+
.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
+ .thenCompose(ignored -> replicaListener ==
null
+ ? completedFuture(new
ReplicaResult(null, null))
+ : replicaListener.apply(request,
senderId));
+ }
+
+ @Override
+ public RaftCommandRunner raftClient() {
+ return raftClient;
+ }
+ };
+
return replicaManager.startReplica(
groupId,
- (request, senderId) -> {
- log.info("Handle request [type={}]",
request.getClass().getSimpleName());
-
- return
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
- .thenCompose(ignored -> {
- if (replicaListener == null) {
- return completedFuture(new
ReplicaResult(null, null));
- } else {
- return
replicaListener.apply(request, senderId);
- }
- });
- },
- raftClient,
- new
PendingComparableValuesTracker<>(Long.MAX_VALUE));
+ newConfiguration,
+ (unused) -> { },
+ (unused) -> listener,
+ new
PendingComparableValuesTracker<>(Long.MAX_VALUE),
+ completedFuture(raftClient));
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 380e5c7b22..41883da2d6 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -100,7 +100,6 @@ public class Replica {
* @param replicaGrpId Replication group id.
* @param listener Replica listener.
* @param storageIndexTracker Storage index tracker.
- * @param raftClient Topology aware Raft client.
* @param localNode Instance of the local node.
* @param executor External executor.
* @param placementDriver Placement driver.
@@ -110,7 +109,6 @@ public class Replica {
ReplicationGroupId replicaGrpId,
ReplicaListener listener,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
- TopologyAwareRaftGroupService raftClient,
ClusterNode localNode,
ExecutorService executor,
PlacementDriver placementDriver,
@@ -119,7 +117,7 @@ public class Replica {
this.replicaGrpId = replicaGrpId;
this.listener = listener;
this.storageIndexTracker = storageIndexTracker;
- this.raftClient = raftClient;
+ this.raftClient = raftClient();
this.localNode = localNode;
this.executor = executor;
this.placementDriver = placementDriver;
@@ -128,6 +126,11 @@ public class Replica {
raftClient.subscribeLeader(this::onLeaderElected);
}
+ /** Returns Raft-client. */
+ public final TopologyAwareRaftGroupService raftClient() {
+ return (TopologyAwareRaftGroupService) listener.raftClient();
+ }
+
/**
* Processes a replication request on the replica.
*
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 64c29a7b81..2c410caffc 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.replicator;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
-import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
@@ -46,8 +44,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -67,7 +69,21 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
import
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
import
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
@@ -88,8 +104,10 @@ import org.apache.ignite.internal.thread.ThreadAttributes;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Replica manager maintains {@link Replica} instances on an Ignite node.
@@ -125,6 +143,19 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Replica message handler. */
private final NetworkMessageHandler handler;
+ /** Raft manager for RAFT-clients creation. */
+ // TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
+ private final RaftManager raftManager;
+
+ /** Raft clients factory for raft server endpoints starting. */
+ private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
+
+ /** Creator for {@link
org.apache.ignite.internal.raft.storage.LogStorageFactory} for volatile tables.
*/
+ private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
+
+ /** Raft command marshaller for raft server endpoints starting. */
+ private final Marshaller raftCommandsMarshaller;
+
/** Message handler for placement driver messages. */
private final NetworkMessageHandler placementDriverMessageHandler;
@@ -141,8 +172,10 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Scheduled executor for idle safe time sync. */
private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
+ /** Executor that will be used to execute requests by replicas. */
private final Executor requestsExecutor;
+ /** Failure processor. */
private final FailureProcessor failureProcessor;
/** Set of message groups to handler as replica requests. */
@@ -154,39 +187,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private String localNodeId;
- /**
- * Constructor for a replica service.
- *
- * @param nodeName Node name.
- * @param clusterNetSvc Cluster network service.
- * @param cmgMgr Cluster group manager.
- * @param clockService Clock service.
- * @param messageGroupsToHandle Message handlers.
- * @param placementDriver A placement driver.
- */
- @TestOnly
- public ReplicaManager(
- String nodeName,
- ClusterService clusterNetSvc,
- ClusterManagementGroupManager cmgMgr,
- ClockService clockService,
- Set<Class<?>> messageGroupsToHandle,
- PlacementDriver placementDriver,
- Executor requestsExecutor,
- FailureProcessor failureProcessor
- ) {
- this(
- nodeName,
- clusterNetSvc,
- cmgMgr,
- clockService,
- messageGroupsToHandle,
- placementDriver,
- requestsExecutor,
- () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
- failureProcessor
- );
- }
+ private String localNodeConsistentId;
/**
* Constructor for a replica service.
@@ -199,6 +200,12 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param placementDriver A placement driver.
* @param requestsExecutor Executor that will be used to execute requests
by replicas.
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe
time propagation period in ms.
+ * @param failureProcessor Failure processor.
+ * @param raftCommandsMarshaller Command marshaller for raft groups
creation.
+ * @param raftGroupServiceFactory A factory for raft-clients creation.
+ * @param raftManager The manager made up of songs and words to spite all
my troubles is not so bad at all.
+ * @param volatileLogStorageFactoryCreator Creator for {@link
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
+ * volatile tables.
*/
public ReplicaManager(
String nodeName,
@@ -209,18 +216,26 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
PlacementDriver placementDriver,
Executor requestsExecutor,
LongSupplier idleSafeTimePropagationPeriodMsSupplier,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ Marshaller raftCommandsMarshaller,
+ TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
+ RaftManager raftManager,
+ LogStorageFactoryCreator volatileLogStorageFactoryCreator
) {
this.clusterNetSvc = clusterNetSvc;
this.cmgMgr = cmgMgr;
this.clockService = clockService;
this.messageGroupsToHandle = messageGroupsToHandle;
+ this.volatileLogStorageFactoryCreator =
volatileLogStorageFactoryCreator;
this.handler = this::onReplicaMessageReceived;
this.placementDriverMessageHandler =
this::onPlacementDriverMessageReceived;
this.placementDriver = placementDriver;
this.requestsExecutor = requestsExecutor;
this.idleSafeTimePropagationPeriodMsSupplier =
idleSafeTimePropagationPeriodMsSupplier;
this.failureProcessor = failureProcessor;
+ this.raftCommandsMarshaller = raftCommandsMarshaller;
+ this.raftGroupServiceFactory = raftGroupServiceFactory;
+ this.raftManager = raftManager;
scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
1,
@@ -466,75 +481,170 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
});
}
+ private CompletableFuture<Boolean> startReplicaInternal(
+ RaftGroupEventsListener raftGroupEventsListener,
+ RaftGroupListener raftGroupListener,
+ boolean isVolatileStorage,
+ SnapshotStorageFactory snapshotStorageFactory,
+ Consumer<RaftGroupService> updateTableRaftService,
+ Function<RaftGroupService, ReplicaListener> createListener,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+ TablePartitionId replicaGrpId,
+ PeersAndLearners newConfiguration
+ ) throws NodeStoppingException {
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+
+ RaftGroupOptions groupOptions = groupOptionsForPartition(
+ isVolatileStorage,
+ snapshotStorageFactory);
+
+ // TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
+ // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
+ CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut =
((Loza) raftManager).startRaftGroupNode(
+ raftNodeId,
+ newConfiguration,
+ raftGroupListener,
+ raftGroupEventsListener,
+ groupOptions,
+ raftGroupServiceFactory
+ );
+
+ return startReplica(
+ replicaGrpId,
+ newConfiguration,
+ updateTableRaftService,
+ createListener,
+ storageIndexTracker,
+ newRaftClientFut);
+ }
+
/**
- * Starts a replica. If a replica with the same partition id already
exists, the method throws an exception.
+ * Creates and starts a new replica.
*
+ * @param raftGroupEventsListener Raft group events listener for raft
group starting.
+ * @param raftGroupListener Raft group listener for raft group starting.
+ * @param isVolatileStorage is table storage volatile?
+ * @param snapshotStorageFactory Snapshot storage factory for raft group
option's parameterization.
+ * @param updateTableRaftService Temporal consumer while TableRaftService
wouldn't be removed in
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
+ * @param createListener Due to creation of ReplicaListener in
TableManager, the function returns desired listener by created
+ * raft-client inside {@link #startReplica} method.
* @param replicaGrpId Replication group id.
- * @param listener Replica listener.
- * @param raftClient Topology aware Raft client.
* @param storageIndexTracker Storage index tracker.
- * @throws NodeStoppingException If node is stopping.
- * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with
the same replication group id has already been
- * started.
+ * @param newConfiguration A configuration for new raft group.
+ * @return Future that promises ready new replica when done.
*/
- public CompletableFuture<Replica> startReplica(
- ReplicationGroupId replicaGrpId,
- ReplicaListener listener,
- TopologyAwareRaftGroupService raftClient,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker
+ public CompletableFuture<Boolean> startReplica(
+ RaftGroupEventsListener raftGroupEventsListener,
+ RaftGroupListener raftGroupListener,
+ boolean isVolatileStorage,
+ SnapshotStorageFactory snapshotStorageFactory,
+ Consumer<RaftGroupService> updateTableRaftService,
+ Function<RaftGroupService, ReplicaListener> createListener,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+ TablePartitionId replicaGrpId,
+ PeersAndLearners newConfiguration
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return startReplicaInternal(replicaGrpId, listener, raftClient,
storageIndexTracker);
+ return startReplicaInternal(
+ raftGroupEventsListener,
+ raftGroupListener,
+ isVolatileStorage,
+ snapshotStorageFactory,
+ updateTableRaftService,
+ createListener,
+ storageIndexTracker,
+ replicaGrpId,
+ newConfiguration);
} finally {
busyLock.leaveBusy();
}
}
/**
- * Internal method for starting a replica.
+ * Starts a raft-client and pass it to a replica creation if the replica
should be started too. If a replica with the same partition id
+ * already exists, the method throws an exception.
+ * TODO: must be deleted or be private after
https://issues.apache.org/jira/browse/IGNITE-22373
*
* @param replicaGrpId Replication group id.
- * @param listener Replica listener.
- * @param raftClient Topology aware Raft client.
+ * @param newConfiguration Peers and Learners of the Raft group.
+ * @param updateTableRaftService A temporal clojure that updates table
raft service with new raft-client, but
+ * TODO: will be removed
https://issues.apache.org/jira/browse/IGNITE-22218
+ * @param createListener A clojure that returns done {@link
ReplicaListener} by given raft-client {@link RaftGroupService}.
* @param storageIndexTracker Storage index tracker.
+ * @param newRaftClientFut A future that returns created raft-client.
+ * @throws NodeStoppingException If node is stopping.
+ * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with
the same replication group id has already been started.
*/
- private CompletableFuture<Replica> startReplicaInternal(
+ @VisibleForTesting
+ @Deprecated
+ public CompletableFuture<Boolean> startReplica(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener,
- TopologyAwareRaftGroupService raftClient,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker
- ) {
+ PeersAndLearners newConfiguration,
+ Consumer<RaftGroupService> updateTableRaftService,
+ Function<RaftGroupService, ReplicaListener> createListener,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+ CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut
+ ) throws NodeStoppingException {
LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
- ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+ CompletableFuture<Boolean> resultFuture =
newRaftClientFut.thenAccept(updateTableRaftService)
+ .thenApply((v) -> true);
- Replica newReplica = new Replica(
- replicaGrpId,
- listener,
- storageIndexTracker,
- raftClient,
- localNode,
- executor,
- placementDriver,
- clockService
- );
+ CompletableFuture<ReplicaListener> newReplicaListenerFut =
newRaftClientFut.thenApply(createListener);
- CompletableFuture<Replica> replicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
- if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
- assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
- LOG.info("Replica is started [replicationGroupId={}].",
replicaGrpId);
+ startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut);
- return completedFuture(newReplica);
- } else {
- existingReplicaFuture.complete(newReplica);
- LOG.info("Replica is started, existing replica waiter was
completed [replicationGroupId={}].", replicaGrpId);
+ return resultFuture;
+ }
- return existingReplicaFuture;
- }
+ /**
+ * Creates and start new replica.
+ * TODO: must be deleted or be private after
https://issues.apache.org/jira/browse/IGNITE-22373
+ *
+ * @param replicaGrpId Replication group id.
+ * @param storageIndexTracker Storage index tracker.
+ * @param newReplicaListenerFut Future that returns ready ReplicaListener
for replica creation.
+ * @return Future that promises ready new replica when done.
+ */
+ @VisibleForTesting
+ @Deprecated
+ public CompletableFuture<Replica> startReplica(
+ ReplicationGroupId replicaGrpId,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker,
+ CompletableFuture<ReplicaListener> newReplicaListenerFut
+ ) throws NodeStoppingException {
+
+ ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+
+ CompletableFuture<Replica> replicaFuture =
newReplicaListenerFut.thenCompose(listener -> {
+ Replica newReplica = new Replica(
+ replicaGrpId,
+ listener,
+ storageIndexTracker,
+ localNode,
+ executor,
+ placementDriver,
+ clockService);
+
+ return replicas.compute(replicaGrpId, (k, existingReplicaFuture)
-> {
+ if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
+ assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
+ LOG.info("Replica is started [replicationGroupId={}].",
replicaGrpId);
+
+ return CompletableFuture.completedFuture(newReplica);
+ } else {
+ LOG.info("Replica is started, existing replica waiter was
completed [replicationGroupId={}].", replicaGrpId);
+
+ existingReplicaFuture.complete(newReplica);
+
+ return existingReplicaFuture;
+ }
+ });
});
var eventParams = new LocalReplicaEventParameters(replicaGrpId);
@@ -548,6 +658,76 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
.thenCompose(v -> replicaFuture);
}
+ /**
+ * Temporary public method for RAFT-client starting.
+ * TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
+ *
+ * @param replicaGrpId Replication Group ID.
+ * @param newConfiguration Peers and learners nodes for a raft group.
+ * @param raftClientCache Temporal supplier that returns RAFT-client from
TableRaftService if it's already exists and was put into the
+ * service's map.
+ * @return Future that returns started RAFT-client.
+ * @throws NodeStoppingException In case if node was stopping.
+ */
+ @Deprecated
+ public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient(
+ ReplicationGroupId replicaGrpId,
+ PeersAndLearners newConfiguration,
+ Supplier<RaftGroupService> raftClientCache)
+ throws NodeStoppingException {
+ RaftGroupService cachedRaftClient = raftClientCache.get();
+ return cachedRaftClient != null
+ ?
CompletableFuture.completedFuture((TopologyAwareRaftGroupService)
cachedRaftClient)
+ // TODO IGNITE-19614 This procedure takes 10 seconds if
there's no majority online.
+ : raftManager.startRaftGroupService(replicaGrpId,
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
+ }
+
+ /**
+ * Returns future with a replica if it was created or null if there no any
replicas starting with given identifier.
+ *
+ * @param replicationGroupId Table-Partition identifier.
+ * @return replica if it was created or null otherwise.
+ */
+ public CompletableFuture<Replica> replica(ReplicationGroupId
replicationGroupId) {
+ return replicas.get(replicationGroupId);
+ }
+
+ /**
+ * Performs a {@code resetPeers} operation on raft node.
+ *
+ * @param replicaGrpId Replication group ID.
+ * @param peersAndLearners New node configuration.
+ */
+ public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners
peersAndLearners) {
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners);
+ }
+
+ /** Getter for wrapped write-ahead log syncer. */
+ // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22292
+ public LogSyncer getLogSyncer() {
+ return raftManager.getLogSyncer();
+ }
+
+ private RaftGroupOptions groupOptionsForPartition(boolean
isVolatileStorage, SnapshotStorageFactory snapshotFactory) {
+ RaftGroupOptions raftGroupOptions;
+
+ if (isVolatileStorage) {
+ LogStorageBudgetView view = ((Loza)
raftManager).volatileRaft().logStorage().value();
+ raftGroupOptions = RaftGroupOptions.forVolatileStores()
+
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(view))
+ .raftMetaStorageFactory((groupId, raftOptions) -> new
VolatileRaftMetaStorage());
+ } else {
+ raftGroupOptions = RaftGroupOptions.forPersistentStores();
+ }
+
+ raftGroupOptions.snapshotStorageFactory(snapshotFactory);
+
+ raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
+
+ return raftGroupOptions;
+ }
+
/**
* Stops a replica by the partition group id.
*
@@ -620,7 +800,16 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
});
- return isRemovedFuture;
+ return isRemovedFuture
+ .thenApply(v -> {
+ try {
+ // TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
+ raftManager.stopRaftNodes(replicaGrpId);
+ } catch (NodeStoppingException ignored) {
+ // No-op.
+ }
+ return v;
+ });
}
/** {@inheritDoc} */
@@ -650,6 +839,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
localNodeId = clusterNetSvc.topologyService().localMember().id();
+ localNodeConsistentId =
clusterNetSvc.topologyService().localMember().name();
+
return nullCompletedFuture();
}
@@ -662,8 +853,10 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
busyLock.block();
- shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 10,
TimeUnit.SECONDS);
- shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+ int shutdownTimeoutSeconds = 10;
+
+ shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds,
TimeUnit.SECONDS);
assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
: "There are replicas alive [replicas="
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index 88a1937e97..3bb7b86822 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.replicator.listener;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
/** Replica listener. */
-@FunctionalInterface
public interface ReplicaListener {
/**
* Invokes a replica listener to process request.
@@ -33,6 +33,9 @@ public interface ReplicaListener {
*/
CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String
senderId);
+ /** Returns Raft-client. */
+ RaftCommandRunner raftClient();
+
/** Callback on replica shutdown. */
default void onShutdown() {
// No-op.
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index 676eb5ec36..57bc8420d8 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -115,11 +115,13 @@ public class PlacementDriverReplicaSideTest extends
BaseIgniteAbstractTest {
when(raftClient.run(any())).thenAnswer(invocationOnMock ->
completedFuture(null));
+ var listener = mock(ReplicaListener.class);
+ when(listener.raftClient()).thenReturn(raftClient);
+
return new Replica(
GRP_ID,
- mock(ReplicaListener.class),
+ listener,
storageIndexTracker,
- raftClient,
LOCAL_NODE,
executor,
new TestPlacementDriver(LOCAL_NODE),
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index f6e661b2a8..4a7d1462a1 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
+import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -50,7 +52,12 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -75,6 +82,9 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
private ReplicaManager replicaManager;
+ @Mock
+ private RaftManager raftManager;
+
@BeforeEach
void startReplicaManager(
TestInfo testInfo,
@@ -82,7 +92,10 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
@Mock ClusterManagementGroupManager cmgManager,
@Mock PlacementDriver placementDriver,
@Mock MessagingService messagingService,
- @Mock TopologyService topologyService
+ @Mock TopologyService topologyService,
+ @Mock Marshaller marshaller,
+ @Mock TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
+ @Mock VolatileLogStorageFactoryCreator
volatileLogStorageFactoryCreator
) {
String nodeName = testNodeName(testInfo, 0);
@@ -110,7 +123,12 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
Set.of(),
placementDriver,
requestsExecutor,
- new NoOpFailureProcessor()
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ new NoOpFailureProcessor(),
+ marshaller,
+ raftGroupServiceFactory,
+ raftManager,
+ volatileLogStorageFactoryCreator
);
assertThat(replicaManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
@@ -140,6 +158,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
*/
@Test
void testReplicaEvents(
+ TestInfo testInfo,
@Mock EventListener<LocalReplicaEventParameters>
createReplicaListener,
@Mock EventListener<LocalReplicaEventParameters>
removeReplicaListener,
@Mock ReplicaListener replicaListener,
@@ -154,12 +173,18 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
replicaManager.listen(BEFORE_REPLICA_STOPPED, removeReplicaListener);
var groupId = new TablePartitionId(0, 0);
+ when(replicaListener.raftClient()).thenReturn(raftGroupService);
+
+ String nodeName = testNodeName(testInfo, 0);
+ PeersAndLearners newConfiguration =
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
- CompletableFuture<Replica> startReplicaFuture =
replicaManager.startReplica(
+ CompletableFuture<Boolean> startReplicaFuture =
replicaManager.startReplica(
groupId,
- replicaListener,
- raftGroupService,
- new PendingComparableValuesTracker<>(0L)
+ newConfiguration,
+ (unused) -> { },
+ (unused) -> replicaListener,
+ new PendingComparableValuesTracker<>(0L),
+ completedFuture(raftGroupService)
);
assertThat(startReplicaFuture, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f419d3b45d..80051692c6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -176,6 +176,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -481,6 +482,9 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
clockService
);
+ ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+ NamedThreadFactory.create(name, "test-rebalance-scheduler",
logger()));
+
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
@@ -490,7 +494,11 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
placementDriverManager.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
- failureProcessor
+ failureProcessor,
+ new
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()),
+ topologyAwareRaftGroupServiceFactory,
+ raftMgr,
+ view -> new LocalLogStorageFactory()
);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
@@ -570,9 +578,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var dataNodesMock = dataNodesMockByNode.get(idx);
- ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- NamedThreadFactory.create(name, "test-rebalance-scheduler",
logger()));
-
DistributionZoneManager distributionZoneManager = new
DistributionZoneManager(
name,
registry,
@@ -604,7 +609,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
clusterSvc.serializationRegistry(),
- raftMgr,
replicaMgr,
lockManager,
replicaService,
@@ -613,13 +617,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
storagePath,
metaStorageMgr,
schemaManager,
- view -> new LocalLogStorageFactory(),
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
+ rebalanceScheduler,
hybridClock,
clockService,
new OutgoingSnapshotsManager(clusterSvc.messagingService()),
- topologyAwareRaftGroupServiceFactory,
distributionZoneManager,
schemaSyncService,
catalogManager,
@@ -627,7 +630,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
placementDriverManager.placementDriver(),
sqlRef::get,
resourcesRegistry,
- rebalanceScheduler,
lowWatermark,
transactionInflights
);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3bc9aa2a6d..e966a158e3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -156,6 +156,7 @@ import
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
@@ -206,6 +207,7 @@ import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -630,6 +632,14 @@ public class IgniteImpl implements Ignite {
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier =
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
+ ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+ NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
+
+ // TODO: IGNITE-22222 this instantiation should be moved inside
ReplicaManager's constructor
+ Marshaller raftMarshaller = new
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry());
+
+ volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(name,
workDir.resolve("volatile-log-spillout"));
+
replicaMgr = new ReplicaManager(
name,
clusterSvc,
@@ -639,7 +649,11 @@ public class IgniteImpl implements Ignite {
placementDriverMgr.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
- failureProcessor
+ failureProcessor,
+ raftMarshaller,
+ topologyAwareRaftGroupServiceFactory,
+ raftMgr,
+ volatileLogStorageFactoryCreator
);
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
@@ -670,8 +684,6 @@ public class IgniteImpl implements Ignite {
nodeConfigRegistry.getConfiguration(StorageConfiguration.KEY)
);
- volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(name,
workDir.resolve("volatile-log-spillout"));
-
outgoingSnapshotsManager = new OutgoingSnapshotsManager(name,
clusterSvc.messagingService());
LongSupplier delayDurationMsSupplier =
delayDurationMsSupplier(schemaSyncConfig);
@@ -697,9 +709,6 @@ public class IgniteImpl implements Ignite {
schemaManager = new SchemaManager(registry, catalogManager);
- ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
-
distributionZoneManager = new DistributionZoneManager(
name,
registry,
@@ -771,7 +780,6 @@ public class IgniteImpl implements Ignite {
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
clusterSvc.serializationRegistry(),
- raftMgr,
replicaMgr,
lockMgr,
replicaSvc,
@@ -780,13 +788,12 @@ public class IgniteImpl implements Ignite {
storagePath,
metaStorageMgr,
schemaManager,
- volatileLogStorageFactoryCreator,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
+ rebalanceScheduler,
clock,
clockService,
outgoingSnapshotsManager,
- topologyAwareRaftGroupServiceFactory,
distributionZoneManager,
schemaSyncService,
catalogManager,
@@ -794,7 +801,6 @@ public class IgniteImpl implements Ignite {
placementDriverMgr.placementDriver(),
this::bareSql,
resourcesRegistry,
- rebalanceScheduler,
lowWatermark,
transactionInflights
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 92f1284085..fd14fd283b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.distributed;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
+import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.table.TxAbstractTest.startNode;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,6 +47,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -57,7 +61,12 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -67,8 +76,10 @@ import
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
import
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -79,6 +90,7 @@ import
org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -126,8 +138,25 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
private ExecutorService requestsExecutor;
+ private Loza raftManager;
+
+ private TopologyAwareRaftGroupService raftClient;
+
+ private final Function<BiFunction<ReplicaRequest, String,
CompletableFuture<ReplicaResult>>, ReplicaListener> replicaListenerCreator =
+ (invokeImpl) -> new ReplicaListener() {
+ @Override
+ public CompletableFuture<ReplicaResult> invoke(ReplicaRequest
request, String senderId) {
+ return invokeImpl.apply(request, senderId);
+ }
+
+ @Override
+ public RaftCommandRunner raftClient() {
+ return raftClient;
+ }
+ };
+
@BeforeEach
- public void setup() {
+ public void setup() throws NodeStoppingException {
var networkAddress = new NetworkAddress(getLocalAddress(),
NODE_PORT_BASE + 1);
var nodeFinder = new StaticNodeFinder(List.of(networkAddress));
@@ -139,6 +168,10 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
// This test is run without Meta storage.
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
+ raftManager = mock(Loza.class);
+ raftClient = mock(TopologyAwareRaftGroupService.class);
+ when(raftManager.startRaftGroupService(any(), any(), any(),
any())).thenReturn(completedFuture(raftClient));
+
requestsExecutor = new ThreadPoolExecutor(
0, 5,
0, TimeUnit.SECONDS,
@@ -151,6 +184,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
clock,
replicationConfiguration
);
+
replicaManager = new ReplicaManager(
NODE_NAME,
clusterService,
@@ -159,7 +193,12 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
Set.of(TableMessageGroup.class, TxMessageGroup.class),
new
TestPlacementDriver(clusterService.topologyService().localMember()),
requestsExecutor,
- new NoOpFailureProcessor()
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ new NoOpFailureProcessor(),
+ mock(ThreadLocalPartitionCommandsMarshaller.class),
+ mock(TopologyAwareRaftGroupServiceFactory.class),
+ raftManager,
+ view -> new LocalLogStorageFactory()
);
assertThat(replicaManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
@@ -184,18 +223,27 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
+ PeersAndLearners newConfiguration =
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
+
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
(message, sender, correlationId) -> {
try {
log.info("Replica msg " +
message.getClass().getSimpleName());
+ ReplicaListener listener =
replicaListenerCreator.apply((req, senderId) -> {
+ ReplicaResponse response =
replicaMessageFactory.replicaResponse()
+ .result(5)
+ .build();
+ return completedFuture(new ReplicaResult(response,
null));
+ });
+
replicaManager.startReplica(
tablePartitionId,
- (request0, senderId) -> completedFuture(new
ReplicaResult(replicaMessageFactory.replicaResponse()
- .result(5)
- .build(), null)),
- mock(TopologyAwareRaftGroupService.class),
- new PendingComparableValuesTracker<>(0L)
+ newConfiguration,
+ (unused) -> { },
+ (unused) -> listener,
+ new PendingComparableValuesTracker<>(0L),
+
completedFuture(mock(TopologyAwareRaftGroupService.class))
);
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
@@ -297,16 +345,22 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+ PeersAndLearners newConfiguration =
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
+
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
(message, sender, correlationId) -> {
runAsync(() -> {
try {
log.info("Replica msg " +
message.getClass().getSimpleName());
+ ReplicaListener listener =
replicaListenerCreator.apply((r, id) -> new CompletableFuture<>());
+
replicaManager.startReplica(
tablePartitionId,
- (request, senderId) -> new CompletableFuture<>(),
- mock(TopologyAwareRaftGroupService.class),
- new PendingComparableValuesTracker<>(0L)
+ newConfiguration,
+ (unused) -> { },
+ (unused) -> listener,
+ new PendingComparableValuesTracker<>(0L),
+
completedFuture(mock(TopologyAwareRaftGroupService.class))
);
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 2ea1aaeb48..15655475a0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThr
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -203,7 +204,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
Set.of()
);
- assertThat(updateFuture, willCompleteSuccessfully());
+ assertThat(updateFuture, willSucceedIn(60, SECONDS));
awaitPrimaryReplica(node0, partId);
@@ -249,7 +250,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
Set.of(anotherPartId)
);
- assertThat(updateFuture, willCompleteSuccessfully());
+ assertThat(updateFuture, willSucceedIn(60, SECONDS));
awaitPrimaryReplica(node0, anotherPartId);
@@ -302,7 +303,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
CompletableFuture<ReplicaMeta> awaitPrimaryReplicaFuture =
node0.placementDriver()
.awaitPrimaryReplica(new TablePartitionId(tableId, partId),
node0.clock().now(), 60, SECONDS);
- assertThat(awaitPrimaryReplicaFuture, willCompleteSuccessfully());
+ assertThat(awaitPrimaryReplicaFuture, willSucceedIn(60, SECONDS));
}
private void assertRealAssignments(IgniteImpl node0, int partId,
Integer... expected) throws InterruptedException {
@@ -330,7 +331,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
CompletableFuture<Void> insertFuture = keyValueView.putAsync(null,
key, Tuple.create(of("val", i + offset)));
try {
- insertFuture.get(1000, MILLISECONDS);
+ insertFuture.get(10, SECONDS);
Tuple value = keyValueView.get(null, key);
assertNotNull(value);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 39453a5142..9306da8ddd 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -155,7 +156,6 @@ import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
@@ -188,6 +188,7 @@ import
org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
+import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -820,9 +821,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws
Exception {
for (int i = 0; i < NODE_COUNT; i++) {
verify(getNode(i).raftManager,
timeout(AWAIT_TIMEOUT_MILLIS).times(1))
- .startRaftGroupNodeWithoutService(any(), any(), any(),
any(), any(RaftGroupOptions.class));
+ .startRaftGroupNode(any(), any(), any(), any(), any(),
notNull(TopologyAwareRaftGroupServiceFactory.class));
verify(getNode(i).replicaManager,
timeout(AWAIT_TIMEOUT_MILLIS).times(1))
- .startReplica(any(), any(), any(), any());
+ .startReplica(any(), any(), any());
}
}
@@ -1194,6 +1195,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
lowWatermark
);
+ rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
+ NamedThreadFactory.create(name,
"test-rebalance-scheduler", logger()));
+
replicaManager = spy(new ReplicaManager(
name,
clusterService,
@@ -1203,7 +1207,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
placementDriver,
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
- new NoOpFailureProcessor()
+ new NoOpFailureProcessor(),
+ new
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+ topologyAwareRaftGroupServiceFactory,
+ raftManager,
+ view -> new LocalLogStorageFactory()
));
LongSupplier delayDurationMsSupplier = () -> 10L;
@@ -1219,9 +1227,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
schemaSyncService = new
SchemaSyncServiceImpl(metaStorageManager.clusterTime(),
delayDurationMsSupplier);
- rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- NamedThreadFactory.create(name,
"test-rebalance-scheduler", logger()));
-
distributionZoneManager = new DistributionZoneManager(
name,
registry,
@@ -1244,7 +1249,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
clusterService.messagingService(),
clusterService.topologyService(),
clusterService.serializationRegistry(),
- raftManager,
replicaManager,
mock(LockManager.class),
replicaSvc,
@@ -1253,13 +1257,12 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
storagePath,
metaStorageManager,
schemaManager,
- view -> new LocalLogStorageFactory(),
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
+ rebalanceScheduler,
clock,
clockService,
new
OutgoingSnapshotsManager(clusterService.messagingService()),
- topologyAwareRaftGroupServiceFactory,
distributionZoneManager,
schemaSyncService,
catalogManager,
@@ -1267,7 +1270,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
placementDriver,
() -> mock(IgniteSql.class),
resourcesRegistry,
- rebalanceScheduler,
lowWatermark,
transactionInflights
) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a239a046f4..ce5246a8f6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -92,6 +92,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@@ -141,23 +142,19 @@ import
org.apache.ignite.internal.network.MessagingService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
-import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
@@ -171,7 +168,6 @@ import
org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
@@ -191,7 +187,6 @@ import
org.apache.ignite.internal.table.distributed.schema.ExecutorInclinedSchem
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
-import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
import
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
@@ -223,7 +218,6 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.util.IgniteNameUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyService;
-import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.Nullable;
@@ -246,9 +240,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final TopologyService topologyService;
- /** Raft manager. */
- private final RaftManager raftMgr;
-
/** Replica manager. */
private final ReplicaManager replicaMgr;
@@ -318,11 +309,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** Schema manager. */
private final SchemaManager schemaManager;
- private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
-
- /** Executor for scheduling rebalance routine. */
- private final ScheduledExecutorService rebalanceScheduler;
-
/** Transaction state storage scheduled pool. */
private final ScheduledExecutorService txStateStorageScheduledPool;
@@ -345,8 +331,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final OutgoingSnapshotsManager outgoingSnapshotsManager;
- private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
-
private final DistributionZoneManager distributionZoneManager;
private final SchemaSyncService executorInclinedSchemaSyncService;
@@ -369,8 +353,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final LowWatermark lowWatermark;
- private final Marshaller raftCommandsMarshaller;
-
private final HybridTimestampTracker observableTimestampTracker;
/** Placement driver. */
@@ -397,6 +379,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
private final Executor partitionOperationsExecutor;
+ /** Executor for scheduling rebalance routine. */
+ private final ScheduledExecutorService rebalanceScheduler;
+
/** Marshallers provider. */
private final ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
@@ -426,23 +411,19 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param gcConfig Garbage collector configuration.
* @param txCfg Transaction configuration.
* @param storageUpdateConfig Storage update handler configuration.
- * @param raftMgr Raft manager.
* @param replicaMgr Replica manager.
* @param lockMgr Lock manager.
* @param replicaSvc Replica service.
* @param txManager Transaction manager.
* @param dataStorageMgr Data storage manager.
* @param schemaManager Schema manager.
- * @param volatileLogStorageFactoryCreator Creator for {@link
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
- * volatile tables.
* @param ioExecutor Separate executor for IO operations like partition
storage initialization or partition raft group meta data
* persisting.
* @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
* will be executed.
- * @param raftGroupServiceFactory Factory that is used for creation of
raft group services for replication groups.
+ * @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param placementDriver Placement driver.
* @param sql A supplier function that returns {@link IgniteSql}.
- * @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param lowWatermark Low watermark.
* @param transactionInflights Transaction inflights.
*/
@@ -455,7 +436,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
MessagingService messagingService,
TopologyService topologyService,
MessageSerializationRegistry messageSerializationRegistry,
- RaftManager raftMgr,
ReplicaManager replicaMgr,
LockManager lockMgr,
ReplicaService replicaSvc,
@@ -464,13 +444,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Path storagePath,
MetaStorageManager metaStorageMgr,
SchemaManager schemaManager,
- LogStorageFactoryCreator volatileLogStorageFactoryCreator,
ExecutorService ioExecutor,
Executor partitionOperationsExecutor,
+ ScheduledExecutorService rebalanceScheduler,
HybridClock clock,
ClockService clockService,
OutgoingSnapshotsManager outgoingSnapshotsManager,
- TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
DistributionZoneManager distributionZoneManager,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -478,12 +457,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
PlacementDriver placementDriver,
Supplier<IgniteSql> sql,
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
- ScheduledExecutorService rebalanceScheduler,
LowWatermark lowWatermark,
TransactionInflights transactionInflights
) {
this.topologyService = topologyService;
- this.raftMgr = raftMgr;
this.replicaMgr = replicaMgr;
this.lockMgr = lockMgr;
this.replicaSvc = replicaSvc;
@@ -491,20 +468,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
this.dataStorageMgr = dataStorageMgr;
this.metaStorageMgr = metaStorageMgr;
this.schemaManager = schemaManager;
- this.volatileLogStorageFactoryCreator =
volatileLogStorageFactoryCreator;
this.ioExecutor = ioExecutor;
this.partitionOperationsExecutor = partitionOperationsExecutor;
+ this.rebalanceScheduler = rebalanceScheduler;
this.clock = clock;
this.clockService = clockService;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
- this.raftGroupServiceFactory = raftGroupServiceFactory;
this.distributionZoneManager = distributionZoneManager;
this.catalogService = catalogService;
this.observableTimestampTracker = observableTimestampTracker;
this.sql = sql;
this.storageUpdateConfig = storageUpdateConfig;
this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
- this.rebalanceScheduler = rebalanceScheduler;
this.lowWatermark = lowWatermark;
this.transactionInflights = transactionInflights;
this.txCfg = txCfg;
@@ -565,8 +540,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
mvGc = new MvGc(nodeName, gcConfig, lowWatermark);
- raftCommandsMarshaller = new
ThreadLocalPartitionCommandsMarshaller(messageSerializationRegistry);
-
partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
metaStorageMgr,
messagingService,
@@ -581,7 +554,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
storagePath.resolve(TX_STATE_DIR),
txStateStorageScheduledPool,
txStateStoragePool,
- raftMgr.getLogSyncer(),
+ replicaMgr.getLogSyncer(),
TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
);
@@ -922,11 +895,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
storageUpdateConfig
);
- Peer serverPeer = realConfiguration.peer(localNode().name());
-
- var raftNodeId = localMemberAssignment == null ? null : new
RaftNodeId(replicaGrpId, serverPeer);
-
- boolean shouldStartRaftListeners = localMemberAssignment != null &&
!((Loza) raftMgr).isStarted(raftNodeId);
+ boolean shouldStartRaftListeners =
shouldStartRaftListeners(assignments, nonStableNodeAssignments);
if (shouldStartRaftListeners) {
((InternalTableImpl) internalTbl).updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
@@ -934,6 +903,24 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
}
+ // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22315
+ Supplier<RaftGroupService> getCachedRaftClient = () -> {
+ try {
+ // Return existing service if it's already started.
+ return internalTbl
+ .tableRaftService()
+ .partitionRaftGroupService(replicaGrpId.partitionId());
+ } catch (IgniteInternalException e) {
+ // We use "IgniteInternalException" in accordance with the
javadoc of "partitionRaftGroupService" method.
+ return null;
+ }
+ };
+
+ // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22218
+ Consumer<RaftGroupService> updateTableRaftService = (raftClient) ->
((InternalTableImpl) internalTbl)
+ .tableRaftService()
+ .updateInternalTableRaftGroupService(partId, raftClient);
+
CompletableFuture<Boolean> startGroupFut;
if (localMemberAssignment != null) {
@@ -946,85 +933,88 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
)
: trueCompletedFuture();
- startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup ->
inBusyLock(busyLock, () -> {
+ startGroupFut = shouldStartGroupFut.thenComposeAsync(startGroup ->
inBusyLock(busyLock, () -> {
+ // (1) if partitionReplicatorNodeRecovery#shouldStartGroup
fails -> do start nothing
if (!startGroup) {
- return false;
+ return falseCompletedFuture();
}
- if (((Loza) raftMgr).isStarted(raftNodeId)) {
+ // (2) if replica already started => check force reset and
finish the process
+ if (replicaMgr.isReplicaStarted(replicaGrpId)) {
if (nonStableNodeAssignments != null &&
nonStableNodeAssignments.force()) {
- ((Loza) raftMgr).resetPeers(raftNodeId,
configurationFromAssignments(nonStableNodeAssignments.nodes()));
+ replicaMgr.resetPeers(replicaGrpId,
configurationFromAssignments(nonStableNodeAssignments.nodes()));
}
-
- return true;
+ return trueCompletedFuture();
}
+ // (3) Otherwise let's start replica manually
+ InternalTable internalTable = table.internalTable();
+
+ RaftGroupListener raftGroupListener = new PartitionListener(
+ txManager,
+ partitionDataStorage,
+ partitionUpdateHandlers.storageUpdateHandler,
+ partitionStorages.getTxStateStorage(),
+ safeTimeTracker,
+ storageIndexTracker,
+ catalogService,
+ table.schemaView(),
+ clockService
+ );
+
+ SnapshotStorageFactory snapshotStorageFactory =
createSnapshotStorageFactory(replicaGrpId,
+ partitionUpdateHandlers, internalTable);
+
+ Function<RaftGroupService, ReplicaListener> createListener =
(raftClient) -> createReplicaListener(
+ replicaGrpId,
+ table,
+ safeTimeTracker,
+ partitionStorages.getMvPartitionStorage(),
+ partitionStorages.getTxStateStorage(),
+ partitionUpdateHandlers,
+ raftClient);
+
+ RaftGroupEventsListener raftGroupEventsListener =
createRaftGroupEventsListener(zoneId, replicaGrpId);
+
+ MvTableStorage mvTableStorage = internalTable.storage();
+
try {
- startPartitionRaftGroupNode(
- replicaGrpId,
- raftNodeId,
- newConfiguration,
- safeTimeTracker,
+ var ret = replicaMgr.startReplica(
+ raftGroupEventsListener,
+ raftGroupListener,
+ mvTableStorage.isVolatile(),
+ snapshotStorageFactory,
+ updateTableRaftService,
+ createListener,
storageIndexTracker,
- table,
- partitionStorages.getTxStateStorage(),
- partitionDataStorage,
- partitionUpdateHandlers,
- zoneId
- );
-
- return true;
- } catch (NodeStoppingException ex) {
- throw new CompletionException(ex);
+ replicaGrpId,
+ newConfiguration);
+ return ret;
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Table
manager", e);
}
}), ioExecutor);
} else {
+ // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
+ // (4) in case if node not in the assignments
startGroupFut = falseCompletedFuture();
}
startGroupFut
- .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
- TableRaftService tableRaftService =
table.internalTable().tableRaftService();
-
- try {
- // Return existing service if it's already started.
- return completedFuture(
- (TopologyAwareRaftGroupService)
tableRaftService.partitionRaftGroupService(replicaGrpId.partitionId())
- );
- } catch (IgniteInternalException e) {
- // We use "IgniteInternalException" in accordance with
the javadoc of "partitionRaftGroupService" method.
- try {
- // TODO IGNITE-19614 This procedure takes 10
seconds if there's no majority online.
- return raftMgr
- .startRaftGroupService(replicaGrpId,
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
- } catch (NodeStoppingException ex) {
- return failedFuture(ex);
- }
- }
- }), ioExecutor)
- .thenAcceptAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
- ((InternalTableImpl) internalTbl).tableRaftService()
- .updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
-
- boolean startedRaftNode = startGroupFut.join();
- if (localMemberAssignment == null || !startedRaftNode ||
replicaMgr.isReplicaStarted(replicaGrpId)) {
- return;
+ // TODO: the stage will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
+ .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, ()
-> {
+ if (isReplicaStarted) {
+ return nullCompletedFuture();
}
+ CompletableFuture<TopologyAwareRaftGroupService>
newRaftClientFut;
try {
- startReplicaWithNewListener(
- replicaGrpId,
- table,
- safeTimeTracker,
- storageIndexTracker,
- partitionStorages.getMvPartitionStorage(),
- partitionStorages.getTxStateStorage(),
- partitionUpdateHandlers,
- updatedRaftGroupService
- );
- } catch (NodeStoppingException ex) {
- throw new AssertionError("Loza was stopped before
Table manager", ex);
+ newRaftClientFut = replicaMgr.startRaftClient(
+ replicaGrpId, newConfiguration,
getCachedRaftClient);
+ } catch (NodeStoppingException e) {
+ throw new CompletionException(e);
}
+ return newRaftClientFut.thenAccept(updateTableRaftService);
}), ioExecutor)
.whenComplete((res, ex) -> {
if (ex != null) {
@@ -1039,31 +1029,36 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return resultFuture;
}
- private void startReplicaWithNewListener(
- TablePartitionId replicaGrpId,
- TableImpl table,
- PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker,
- MvPartitionStorage mvPartitionStorage,
- TxStateStorage txStatePartitionStorage,
- PartitionUpdateHandlers partitionUpdateHandlers,
- TopologyAwareRaftGroupService raftGroupService
- ) throws NodeStoppingException {
- PartitionReplicaListener listener = createReplicaListener(
- replicaGrpId,
- table,
- safeTimeTracker,
- mvPartitionStorage,
- txStatePartitionStorage,
- partitionUpdateHandlers,
- raftGroupService
- );
+ private boolean shouldStartRaftListeners(Assignments assignments,
@Nullable Assignments nonStableNodeAssignments) {
+ Set<Assignment> nodesForStarting = nonStableNodeAssignments == null
+ ? assignments.nodes()
+ : RebalanceUtil.subtract(nonStableNodeAssignments.nodes(),
assignments.nodes());
+ return nodesForStarting
+ .stream()
+ .anyMatch(assignment ->
assignment.consistentId().equals(localNode().name()));
+ }
+
+ private PartitionMover createPartitionMover(TablePartitionId replicaGrpId)
{
+ return new PartitionMover(busyLock, () -> {
+ CompletableFuture<Replica> replicaFut =
replicaMgr.replica(replicaGrpId);
+ if (replicaFut == null) {
+ return failedFuture(new IgniteInternalException("No such
replica for partition " + replicaGrpId.partitionId()
+ + " in table " + replicaGrpId.tableId()));
+ }
+ return replicaFut.thenApply(Replica::raftClient);
+ });
+ }
+
+ private RaftGroupEventsListener createRaftGroupEventsListener(int zoneId,
TablePartitionId replicaGrpId) {
+ PartitionMover partitionMover = createPartitionMover(replicaGrpId);
- replicaMgr.startReplica(
+ return new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
replicaGrpId,
- listener,
- raftGroupService,
- storageIndexTracker
+ busyLock,
+ partitionMover,
+ rebalanceScheduler,
+ zoneId
);
}
@@ -1122,46 +1117,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return new PartitionKey(internalTbl.tableId(), partId);
}
- private RaftGroupOptions groupOptionsForPartition(
- MvTableStorage mvTableStorage,
- TxStateTableStorage txStateTableStorage,
- PartitionKey partitionKey,
- PartitionUpdateHandlers partitionUpdateHandlers
- ) {
- RaftGroupOptions raftGroupOptions;
-
- if (mvTableStorage.isVolatile()) {
- raftGroupOptions = RaftGroupOptions.forVolatileStores()
- // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
-
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(((Loza)
raftMgr).volatileRaft().logStorage().value()))
- .raftMetaStorageFactory((groupId, raftOptions) -> new
VolatileRaftMetaStorage());
- } else {
- raftGroupOptions = RaftGroupOptions.forPersistentStores();
- }
-
- raftGroupOptions.snapshotStorageFactory(new
PartitionSnapshotStorageFactory(
- topologyService,
- outgoingSnapshotsManager,
- new PartitionAccessImpl(
- partitionKey,
- mvTableStorage,
- txStateTableStorage,
- mvGc,
- partitionUpdateHandlers.indexUpdateHandler,
- partitionUpdateHandlers.gcUpdateHandler,
- fullStateTransferIndexChooser,
- schemaManager.schemaRegistry(partitionKey.tableId()),
- lowWatermark
- ),
- catalogService,
- incomingSnapshotsExecutor
- ));
-
- raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
-
- return raftGroupOptions;
- }
-
@Override
public void beforeNodeStop() {
if (!beforeStopGuard.compareAndSet(false, true)) {
@@ -1194,11 +1149,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
mvGc,
fullStateTransferIndexChooser,
sharedTxStateStorage,
- () -> shutdownAndAwaitTermination(rebalanceScheduler,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> shutdownAndAwaitTermination(txStateStoragePool,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() ->
shutdownAndAwaitTermination(txStateStorageScheduledPool,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() ->
shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds,
TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(rebalanceScheduler,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> shutdownAndAwaitTermination(streamerFlushExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS)
);
} catch (Exception e) {
@@ -1857,7 +1812,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
boolean isRecovery
) {
ClusterNode localMember = localNode();
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode().name()));
boolean pendingAssignmentsAreForced = pendingAssignments.force();
Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -1923,8 +1877,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}), ioExecutor);
} else {
localServicesStartFuture = runAsync(() -> {
- if (pendingAssignmentsAreForced && ((Loza)
raftMgr).isStarted(raftNodeId)) {
- ((Loza) raftMgr).resetPeers(raftNodeId,
configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes()));
+ if (pendingAssignmentsAreForced &&
replicaMgr.isReplicaStarted(replicaGrpId)) {
+ replicaMgr.resetPeers(replicaGrpId,
configurationFromAssignments(nonStableNodeAssignmentsFinal.nodes()));
}
}, ioExecutor);
}
@@ -2047,55 +2001,29 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
});
}
- private void startPartitionRaftGroupNode(
+ private SnapshotStorageFactory createSnapshotStorageFactory(
TablePartitionId replicaGrpId,
- RaftNodeId raftNodeId,
- PeersAndLearners stableConfiguration,
- PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker,
- TableImpl table,
- TxStateStorage txStatePartitionStorage,
- PartitionDataStorage partitionDataStorage,
PartitionUpdateHandlers partitionUpdateHandlers,
- int zoneId
- ) throws NodeStoppingException {
- InternalTable internalTable = table.internalTable();
-
- RaftGroupOptions groupOptions = groupOptionsForPartition(
- internalTable.storage(),
- internalTable.txStateStorage(),
- partitionKey(internalTable, replicaGrpId.partitionId()),
- partitionUpdateHandlers
- );
+ InternalTable internalTable
+ ) {
+ PartitionKey partitionKey = partitionKey(internalTable,
replicaGrpId.partitionId());
- RaftGroupListener raftGrpLsnr = new PartitionListener(
- txManager,
- partitionDataStorage,
- partitionUpdateHandlers.storageUpdateHandler,
- txStatePartitionStorage,
- safeTimeTracker,
- storageIndexTracker,
+ return new PartitionSnapshotStorageFactory(
+ topologyService,
+ outgoingSnapshotsManager,
+ new PartitionAccessImpl(
+ partitionKey,
+ internalTable.storage(),
+ internalTable.txStateStorage(),
+ mvGc,
+ partitionUpdateHandlers.indexUpdateHandler,
+ partitionUpdateHandlers.gcUpdateHandler,
+ fullStateTransferIndexChooser,
+ schemaManager.schemaRegistry(partitionKey.tableId()),
+ lowWatermark
+ ),
catalogService,
- table.schemaView(),
- clockService
- );
-
- RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- replicaGrpId,
- busyLock,
- createPartitionMover(internalTable,
replicaGrpId.partitionId()),
- rebalanceScheduler,
- zoneId
- );
-
- // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
- ((Loza) raftMgr).startRaftGroupNodeWithoutService(
- raftNodeId,
- stableConfiguration,
- raftGrpLsnr,
- raftGrpEvtsLsnr,
- groupOptions
+ incomingSnapshotsExecutor
);
}
@@ -2169,10 +2097,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
};
}
- private PartitionMover createPartitionMover(InternalTable internalTable,
int partId) {
- return new PartitionMover(busyLock, () ->
internalTable.tableRaftService().partitionRaftGroupService(partId));
- }
-
private static PeersAndLearners
configurationFromAssignments(Collection<Assignment> assignments) {
var peers = new HashSet<String>();
var learners = new HashSet<String>();
@@ -2382,15 +2306,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
return stopReplicaFuture
- .thenCompose(v -> {
- try {
- raftMgr.stopRaftNodes(tablePartitionId);
- } catch (NodeStoppingException ignored) {
- // No-op.
- }
-
- return mvGc.removeStorage(tablePartitionId);
- });
+ .thenCompose(v -> mvGc.removeStorage(tablePartitionId));
}
private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId
tablePartitionId, TableImpl table) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b38ddbc09c..406954e831 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -452,6 +453,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /** Returns Raft-client. */
+ @Override
+ public RaftCommandRunner raftClient() {
+ if (raftClient instanceof ExecutorInclinedRaftCommandRunner) {
+ return ((ExecutorInclinedRaftCommandRunner)
raftClient).decoratedCommandRunner();
+ }
+ return raftClient;
+ }
+
private CompletableFuture<?> processRequest(ReplicaRequest request,
@Nullable Boolean isPrimary, String senderId,
@Nullable Long leaseStartTime) {
if (request instanceof SchemaVersionAwareReplicaRequest) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
index 261eb1461a..ce75a0e26a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
@@ -66,7 +67,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
.thenReturn(failedFuture(new IOException()))
.thenReturn(nullCompletedFuture());
- var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), ()
-> raftService);
+ var partitionMover = new PartitionMover(new IgniteSpinBusyLock(), ()
-> completedFuture(raftService));
assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM),
willCompleteSuccessfully());
@@ -77,7 +78,9 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
public void testComponentStop() {
var lock = new IgniteSpinBusyLock();
- var partitionMover = new PartitionMover(lock, () ->
mock(RaftGroupService.class));
+ RaftGroupService raftService = mock(RaftGroupService.class);
+
+ var partitionMover = new PartitionMover(lock, () ->
completedFuture(raftService));
lock.block();
@@ -93,7 +96,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
when(raftService.changePeersAsync(any(), anyLong()))
.then(invocation -> CompletableFuture.runAsync(lock::block));
- var partitionMover = new PartitionMover(lock, () -> raftService);
+ var partitionMover = new PartitionMover(lock, () ->
completedFuture(raftService));
assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM),
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index d8233857e3..0a857939f1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
@@ -87,9 +88,7 @@ import
org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
@@ -277,6 +276,12 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(topologyService.localMember()).thenReturn(node);
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
+ when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class));
+ when(replicaMgr.startReplica(any(), any(), any(), any(), any(), any()))
+ .thenReturn(nullCompletedFuture());
+ // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
+ when(replicaMgr.startRaftClient(any(), any(), any()))
+
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
try (MockedStatic<SchemaUtils> schemaServiceMock =
mockStatic(SchemaUtils.class)) {
@@ -314,7 +319,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
clusterService.messagingService(),
clusterService.topologyService(),
clusterService.serializationRegistry(),
- rm,
replicaMgr,
null,
null,
@@ -323,13 +327,12 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
workDir,
metaStorageManager,
sm = new SchemaManager(revisionUpdater, catalogManager),
- budgetView -> new LocalLogStorageFactory(),
partitionOperationsExecutor,
partitionOperationsExecutor,
+ mock(ScheduledExecutorService.class),
clock,
clockService,
new
OutgoingSnapshotsManager(clusterService.messagingService()),
- mock(TopologyAwareRaftGroupServiceFactory.class),
distributionZoneManager,
new AlwaysSyncedSchemaSyncService(),
catalogManager,
@@ -337,7 +340,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
placementDriver,
() -> mock(IgniteSql.class),
new RemotelyTriggeredResourceRegistry(),
- mock(ScheduledExecutorService.class),
lowWatermark,
new TransactionInflights(placementDriver, clockService)
) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index dcfad44ebc..9c80784c5f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atMost;
@@ -102,11 +103,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
@@ -281,6 +278,11 @@ public class TableManagerTest extends IgniteAbstractTest {
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
+ when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any(), any()))
+ .thenReturn(trueCompletedFuture());
+ // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
+ when(replicaMgr.startRaftClient(any(), any(), any()))
+
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
tblManagerFut = new CompletableFuture<>();
@@ -556,8 +558,6 @@ public class TableManagerTest extends IgniteAbstractTest {
private IgniteBiTuple<TableViewInternal, TableManager>
startTableManagerStopTest() throws Exception {
TableViewInternal table =
mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut);
- verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(),
any(), any());
-
TableManager tableManager = tblManagerFut.join();
return new IgniteBiTuple<>(table, tableManager);
@@ -569,7 +569,6 @@ public class TableManagerTest extends IgniteAbstractTest {
tableManager.beforeNodeStop();
assertThat(tableManager.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
- verify(rm, times(PARTITIONS)).stopRaftNodes(any());
verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
verify(table.internalTable().storage()).close();
@@ -716,15 +715,6 @@ public class TableManagerTest extends IgniteAbstractTest {
) throws Exception {
String consistentId = "node0";
- when(rm.startRaftGroupService(any(), any(), any(),
any())).thenAnswer(mock -> {
- RaftGroupService raftGrpSrvcMock =
mock(TopologyAwareRaftGroupService.class);
-
- when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
-
- return completedFuture(raftGrpSrvcMock);
- });
-
- // TODO: useless code
https://issues.apache.org/jira/browse/IGNITE-22388
when(ts.getByConsistentId(any())).thenReturn(new ClusterNodeImpl(
UUID.randomUUID().toString(),
consistentId,
@@ -795,7 +785,6 @@ public class TableManagerTest extends IgniteAbstractTest {
clusterService.messagingService(),
clusterService.topologyService(),
clusterService.serializationRegistry(),
- rm,
replicaMgr,
null,
null,
@@ -804,13 +793,12 @@ public class TableManagerTest extends IgniteAbstractTest {
workDir,
msm,
sm = new SchemaManager(revisionUpdater, catalogManager),
- budgetView -> new LocalLogStorageFactory(),
partitionOperationsExecutor,
partitionOperationsExecutor,
+ mock(ScheduledExecutorService.class),
clock,
new TestClockService(clock),
new
OutgoingSnapshotsManager(clusterService.messagingService()),
- mock(TopologyAwareRaftGroupServiceFactory.class),
distributionZoneManager,
new AlwaysSyncedSchemaSyncService(),
catalogManager,
@@ -818,7 +806,6 @@ public class TableManagerTest extends IgniteAbstractTest {
new TestPlacementDriver(node),
() -> mock(IgniteSql.class),
new RemotelyTriggeredResourceRegistry(),
- mock(ScheduledExecutorService.class),
lowWatermark,
mock(TransactionInflights.class)
) {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 84030d69bf..d242bf8be0 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.raft.TestLozaFactory;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -384,8 +385,10 @@ public class ItTxTestCluster {
HybridClock clock = new HybridClockImpl();
TestClockService clockService = new TestClockService(clock);
- clocks.put(node.name(), clock);
- clockServices.put(node.name(), clockService);
+ String nodeName = node.name();
+
+ clocks.put(nodeName, clock);
+ clockServices.put(nodeName, clockService);
var raftSrv = TestLozaFactory.create(
clusterService,
@@ -396,27 +399,41 @@ public class ItTxTestCluster {
assertThat(raftSrv.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- raftServers.put(node.name(), raftSrv);
+ raftServers.put(nodeName, raftSrv);
var cmgManager = mock(ClusterManagementGroupManager.class);
// This test is run without Meta storage.
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
+ var commandMarshaller = new
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
+
+ var raftClientFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ logicalTopologyService(clusterService),
+ Loza.FACTORY,
+ new RaftGroupEventsClientListener()
+ );
+
ReplicaManager replicaMgr = new ReplicaManager(
- node.name(),
+ nodeName,
clusterService,
cmgManager,
clockService,
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriver,
partitionOperationsExecutor,
- new NoOpFailureProcessor()
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ new NoOpFailureProcessor(),
+ commandMarshaller,
+ raftClientFactory,
+ raftSrv,
+ new VolatileLogStorageFactoryCreator(nodeName,
workDir.resolve("volatile-log-spillout"))
);
assertThat(replicaMgr.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- replicaManagers.put(node.name(), replicaMgr);
+ replicaManagers.put(nodeName, replicaMgr);
LOG.info("Replica manager has been started, node=[" + node + ']');
@@ -428,15 +445,15 @@ public class ItTxTestCluster {
executor
));
- replicaServices.put(node.name(), replicaSvc);
+ replicaServices.put(nodeName, replicaSvc);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver, clockService);
- txInflights.put(node.name(), transactionInflights);
+ txInflights.put(nodeName, transactionInflights);
- cursorRegistries.put(node.name(), resourcesRegistry);
+ cursorRegistries.put(nodeName, resourcesRegistry);
TxManagerImpl txMgr = newTxManager(
clusterService,
@@ -451,7 +468,7 @@ public class ItTxTestCluster {
);
ResourceVacuumManager resourceVacuumManager = new
ResourceVacuumManager(
- node.name(),
+ nodeName,
resourcesRegistry,
clusterService.topologyService(),
clusterService.messagingService(),
@@ -460,12 +477,12 @@ public class ItTxTestCluster {
);
assertThat(txMgr.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- txManagers.put(node.name(), txMgr);
+ txManagers.put(nodeName, txMgr);
assertThat(resourceVacuumManager.startAsync(new
ComponentContext()), willCompleteSuccessfully());
- resourceCleanupManagers.put(node.name(), resourceVacuumManager);
+ resourceCleanupManagers.put(nodeName, resourceVacuumManager);
- txStateStorages.put(node.name(), new TestTxStateStorage());
+ txStateStorages.put(nodeName, new TestTxStateStorage());
}
LOG.info("Raft servers have been started");
@@ -689,9 +706,8 @@ public class ItTxTestCluster {
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tableId, partId),
- listener,
- raftSvc,
- storageIndexTracker
+ storageIndexTracker,
+ completedFuture(listener)
);
} catch (NodeStoppingException e) {
fail("Unexpected node stopping", e);