This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5fc56a61e82 IGNITE-27157 Adapt CMG services to use timeout (#7509)
5fc56a61e82 is described below
commit 5fc56a61e82f5de84ebdc8692797d854f7ec6cb4
Author: Cyrill <[email protected]>
AuthorDate: Tue Mar 17 13:22:47 2026 +0300
IGNITE-27157 Adapt CMG services to use timeout (#7509)
Co-authored-by: Kirill Sizov <[email protected]>
---
modules/cluster-management/build.gradle | 1 +
.../management/raft/ItCmgRaftServiceTest.java | 48 +++++++---
.../management/ClusterManagementGroupManager.java | 104 ++++++++++++++++++---
.../cluster/management/raft/CmgRaftService.java | 72 ++++++++------
...ysicalTopologyAwareRaftGroupServiceFactory.java | 81 ++++++++++++++++
.../ClusterManagementGroupManagerTest.java | 18 ++--
.../management/raft/CmgRaftServiceTest.java | 7 +-
.../internal/cluster/management/MockNode.java | 12 ++-
.../rebalance/ItRebalanceDistributedTest.java | 1 +
.../ItMetaStorageMultipleNodesAbstractTest.java | 1 +
.../metastorage/impl/ItMetaStorageWatchTest.java | 1 +
.../partition/replicator/fixtures/Node.java | 1 +
.../apache/ignite/internal/raft/RaftManager.java | 48 ++++++++++
.../raft/TimeAwareRaftGroupServiceFactory.java | 53 +++++++++++
.../raft/service/TimeAwareRaftGroupService.java | 2 +
.../java/org/apache/ignite/internal/raft/Loza.java | 71 ++++++++++++++
.../PhysicalTopologyAwareRaftGroupService.java | 20 ++--
.../ItDistributedConfigurationPropertiesTest.java | 1 +
.../ItDistributedConfigurationStorageTest.java | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
21 files changed, 471 insertions(+), 74 deletions(-)
diff --git a/modules/cluster-management/build.gradle
b/modules/cluster-management/build.gradle
index b1ccf334640..74f56eab774 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -36,6 +36,7 @@ dependencies {
implementation project(':ignite-failure-handler')
implementation project(':ignite-network')
implementation project(':ignite-raft-api')
+ implementation project(':ignite-raft')
implementation project(':ignite-vault')
implementation project(':ignite-rocksdb-common')
implementation project(':ignite-storage-api')
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index e4c0d086467..f4efbce851f 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -71,9 +71,10 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.StoppingExceptionFactories;
import org.apache.ignite.internal.raft.TestLozaFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageManager;
import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -81,6 +82,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -116,6 +118,10 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
private final ComponentWorkingDir workingDir;
+ private final RaftGroupEventsClientListener eventsClientListener;
+
+ private final NoOpFailureManager failureManager;
+
Node(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder,
Path workDir) {
this.clusterService = clusterService(testInfo, addr.port(),
nodeFinder);
workingDir = new ComponentWorkingDir(workDir);
@@ -124,8 +130,16 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
clusterService.nodeName(),
workingDir.raftLogPath()
);
- this.raftManager = TestLozaFactory.create(clusterService,
raftConfiguration, systemLocalConfiguration, new HybridClockImpl());
- this.logicalTopology = new
LogicalTopologyImpl(clusterStateStorage, new NoOpFailureManager());
+ this.eventsClientListener = new RaftGroupEventsClientListener();
+ this.failureManager = new NoOpFailureManager();
+ this.raftManager = TestLozaFactory.create(
+ clusterService,
+ raftConfiguration,
+ systemLocalConfiguration,
+ new HybridClockImpl(),
+ eventsClientListener
+ );
+ this.logicalTopology = new
LogicalTopologyImpl(clusterStateStorage, failureManager);
}
void start() {
@@ -147,14 +161,26 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
Peer serverPeer = configuration.peer(localMember().name());
- RaftGroupService raftService;
+ var raftServiceFactory = new
PhysicalTopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ eventsClientListener,
+ failureManager
+ );
+
+ TimeAwareRaftGroupService raftService;
if (serverPeer == null) {
- raftService =
raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration, true);
+ raftService = raftManager.startTimeAwareRaftGroupService(
+ CmgGroupId.INSTANCE,
+ configuration,
+ raftServiceFactory,
+ StoppingExceptionFactories.indicateComponentStop(),
+ true
+ );
} else {
var clusterStateStorageMgr = new
ClusterStateStorageManager(clusterStateStorage);
- raftService =
raftManager.startSystemRaftGroupNodeAndWaitNodeReady(
+ raftService =
raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
configuration,
new CmgRaftGroupListener(
@@ -163,11 +189,11 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
new
ValidationManager(clusterStateStorageMgr, logicalTopology),
term -> {},
new ClusterIdHolder(),
- new NoOpFailureManager(),
+ failureManager,
config -> {}
),
RaftGroupEventsListener.noopLsnr,
- null,
+ raftServiceFactory,
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageManager,
workingDir.metaPath())
);
}
@@ -197,11 +223,11 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
}
private CompletableFuture<Set<LogicalNode>> logicalTopologyNodes() {
- return
raftService.logicalTopology().thenApply(LogicalTopologySnapshot::nodes);
+ return
raftService.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(LogicalTopologySnapshot::nodes);
}
private CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
- return raftService.validatedNodes();
+ return
raftService.validatedNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
}
}
@@ -229,7 +255,7 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
}
/**
- * Tests the basic scenario of {@link CmgRaftService#logicalTopology()}
when nodes are joining and leaving.
+ * Tests the basic scenario of {@link
CmgRaftService#logicalTopology(long)} when nodes are joining and leaving.
*/
@Test
void testLogicalTopology() {
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 45a0b6ecf31..26195017d9a 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -66,6 +66,7 @@ import
org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
import org.apache.ignite.internal.cluster.management.raft.CmgRaftService;
import
org.apache.ignite.internal.cluster.management.raft.IllegalInitArgumentException;
import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
+import
org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.cluster.management.raft.ValidationManager;
import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
@@ -98,13 +99,14 @@ import
org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -173,6 +175,9 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/** Failure processor that is used to handle critical errors. */
private final FailureProcessor failureProcessor;
+ /** Raft group events client listener for receiving leader election
notifications. */
+ private final RaftGroupEventsClientListener eventsClientListener;
+
private final ClusterIdStore clusterIdStore;
private final ClusterResetStorage clusterResetStorage;
@@ -204,6 +209,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
ValidationManager validationManager,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
+ RaftGroupEventsClientListener eventsClientListener,
ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager
@@ -219,6 +225,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
validationManager,
nodeAttributes,
failureProcessor,
+ eventsClientListener,
clusterIdStore,
raftGroupOptionsConfigurer,
metricManager,
@@ -238,6 +245,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
ValidationManager validationManager,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
+ RaftGroupEventsClientListener eventsClientListener,
ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager,
@@ -253,6 +261,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
this.localStateStorage = new LocalStateStorage(vault);
this.nodeAttributes = nodeAttributes;
this.failureProcessor = failureProcessor;
+ this.eventsClientListener = eventsClientListener;
this.clusterIdStore = clusterIdStore;
this.raftGroupOptionsConfigurer = raftGroupOptionsConfigurer;
this.metricsManager = metricManager;
@@ -336,6 +345,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LogicalTopology logicalTopology,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
+ RaftGroupEventsClientListener eventsClientListener,
ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager
@@ -351,6 +361,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
new ValidationManager(new
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
nodeAttributes,
failureProcessor,
+ eventsClientListener,
clusterIdStore,
raftGroupOptionsConfigurer,
metricManager
@@ -369,6 +380,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LogicalTopology logicalTopology,
NodeAttributes nodeAttributes,
FailureProcessor failureProcessor,
+ RaftGroupEventsClientListener eventsClientListener,
ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager,
@@ -385,6 +397,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
new ValidationManager(new
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
nodeAttributes,
failureProcessor,
+ eventsClientListener,
clusterIdStore,
raftGroupOptionsConfigurer,
metricManager,
@@ -804,7 +817,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* cluster state.
*/
private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
- return service.logicalTopology()
+ return service.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT)
.thenCompose(logicalTopology -> inBusyLock(() -> {
Set<UUID> physicalTopologyIds =
clusterService.topologyService().allMembers()
.stream()
@@ -987,7 +1000,13 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LOG.info("Starting CMG Raft service [isLearner={}, nodeNames={},
serverPeer={}]", isLearner, nodeNames, serverPeer);
- RaftGroupService service =
raftManager.startSystemRaftGroupNodeAndWaitNodeReady(
+ var raftServiceFactory = new
PhysicalTopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ eventsClientListener,
+ failureProcessor
+ );
+
+ TimeAwareRaftGroupService raftGroupService =
raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
raftNodeId(serverPeer),
raftConfiguration,
new CmgRaftGroupListener(
@@ -1002,11 +1021,11 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
(term, configurationTerm, configurationIndex,
configuration, sequenceToken) -> {
onElectedAsLeader(term);
},
- null,
+ raftServiceFactory,
raftGroupOptionsConfigurer
);
- return new CmgRaftService(service,
clusterService.topologyService(), logicalTopology);
+ return new CmgRaftService(raftGroupService,
clusterService.topologyService(), logicalTopology);
} catch (NodeStoppingException e) {
throw new IgniteInternalException(NODE_STOPPING_ERR, e);
} finally {
@@ -1101,7 +1120,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
private void sendClusterState(CmgRaftService raftService,
Collection<InternalClusterNode> nodes) {
- raftService.logicalTopology()
+ raftService.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT)
.thenCompose(topology -> {
// TODO https://issues.apache.org/jira/browse/IGNITE-24769
Set<InternalClusterNode> logicalTopology =
topology.nodes().stream()
@@ -1245,7 +1264,17 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* @return Future that, when complete, resolves into a list of node names
that host the Meta Storage.
*/
public CompletableFuture<Set<String>> metaStorageNodes() {
- return metaStorageInfo()
+ return metaStorageNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
+ }
+
+ /**
+ * Returns a future that, when complete, resolves into a list of node
names that host the Meta Storage.
+ *
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ * @return Future that, when complete, resolves into a list of node names
that host the Meta Storage.
+ */
+ public CompletableFuture<Set<String>> metaStorageNodes(long timeout) {
+ return metaStorageInfo(timeout)
.thenApply(MetaStorageInfo::metaStorageNodes);
}
@@ -1253,13 +1282,22 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* Returns a future that, when complete, resolves into a Meta storage info.
*/
public CompletableFuture<MetaStorageInfo> metaStorageInfo() {
+ return metaStorageInfo(TimeAwareRaftGroupService.NO_TIMEOUT);
+ }
+
+ /**
+ * Returns a future that, when complete, resolves into a Meta storage info.
+ *
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ */
+ public CompletableFuture<MetaStorageInfo> metaStorageInfo(long timeout) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
return raftServiceAfterJoin()
- .thenCompose(CmgRaftService::readMetaStorageInfo);
+ .thenCompose(service ->
service.readMetaStorageInfo(timeout));
} finally {
busyLock.leaveBusy();
}
@@ -1308,12 +1346,22 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* @return Future that, when complete, resolves into a logical topology
snapshot.
*/
public CompletableFuture<LogicalTopologySnapshot> logicalTopology() {
+ return logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT);
+ }
+
+ /**
+ * Returns a future that, when complete, resolves into a logical topology
snapshot.
+ *
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ * @return Future that, when complete, resolves into a logical topology
snapshot.
+ */
+ public CompletableFuture<LogicalTopologySnapshot> logicalTopology(long
timeout) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return
raftServiceAfterJoin().thenCompose(CmgRaftService::logicalTopology);
+ return raftServiceAfterJoin().thenCompose(service ->
service.logicalTopology(timeout));
} finally {
busyLock.leaveBusy();
}
@@ -1324,12 +1372,22 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* Logical Topology as well as nodes that only have passed the validation
step.
*/
public CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
+ return validatedNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
+ }
+
+ /**
+ * Returns a future that, when complete, resolves into a list of validated
nodes. This list includes all nodes currently present in the
+ * Logical Topology as well as nodes that only have passed the validation
step.
+ *
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ */
+ public CompletableFuture<Set<InternalClusterNode>> validatedNodes(long
timeout) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return
raftServiceAfterJoin().thenCompose(CmgRaftService::validatedNodes);
+ return raftServiceAfterJoin().thenCompose(service ->
service.validatedNodes(timeout));
} finally {
busyLock.leaveBusy();
}
@@ -1360,7 +1418,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* @return Future that completes when the command is executed by the CMG.
*/
public CompletableFuture<Void> changeMetastorageNodes(Set<String>
newMetastorageNodes) {
- return changeMetastorageNodesInternal(newMetastorageNodes, null);
+ return changeMetastorageNodesInternal(newMetastorageNodes, null,
TimeAwareRaftGroupService.NO_TIMEOUT);
}
/**
@@ -1372,12 +1430,30 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
* @return Future that completes when the command is executed by the CMG.
*/
public CompletableFuture<Void> changeMetastorageNodes(Set<String>
newMetastorageNodes, long metastorageRepairingConfigIndex) {
- return changeMetastorageNodesInternal(newMetastorageNodes,
metastorageRepairingConfigIndex);
+ return changeMetastorageNodesInternal(newMetastorageNodes,
metastorageRepairingConfigIndex, TimeAwareRaftGroupService.NO_TIMEOUT);
+ }
+
+ /**
+ * Changes metastorage nodes in the CMG for the forceful (with repair)
Metastorage reconfiguration procedure.
+ *
+ * @param newMetastorageNodes Metastorage node names to set.
+ * @param metastorageRepairingConfigIndex Raft index in the Metastorage
group under which the forced configuration is
+ * (or will be) saved.
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ * @return Future that completes when the command is executed by the CMG.
+ */
+ public CompletableFuture<Void> changeMetastorageNodes(
+ Set<String> newMetastorageNodes,
+ long metastorageRepairingConfigIndex,
+ long timeout
+ ) {
+ return changeMetastorageNodesInternal(newMetastorageNodes,
metastorageRepairingConfigIndex, timeout);
}
private CompletableFuture<Void> changeMetastorageNodesInternal(
Set<String> newMetastorageNodes,
- @Nullable Long metastorageRepairingConfigIndex
+ @Nullable Long metastorageRepairingConfigIndex,
+ long timeout
) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
@@ -1385,7 +1461,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
try {
return raftServiceAfterJoin()
- .thenCompose(service ->
service.changeMetastorageNodes(newMetastorageNodes,
metastorageRepairingConfigIndex));
+ .thenCompose(service ->
service.changeMetastorageNodes(newMetastorageNodes,
metastorageRepairingConfigIndex, timeout));
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index acd0c1e042d..78dec96585f 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -49,21 +49,22 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.jetbrains.annotations.Nullable;
/**
- * A wrapper around a {@link RaftGroupService} providing helpful methods for
working with the CMG.
+ * A wrapper around a {@link TimeAwareRaftGroupService} providing helpful
methods for working with the CMG.
*/
public class CmgRaftService implements ManuallyCloseable {
private static final IgniteLogger LOG =
Loggers.forClass(CmgRaftService.class);
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
- private final RaftGroupService raftService;
+ private final TimeAwareRaftGroupService raftService;
private final TopologyService topologyService;
@@ -72,7 +73,7 @@ public class CmgRaftService implements ManuallyCloseable {
/**
* Creates a new instance.
*/
- public CmgRaftService(RaftGroupService raftService, TopologyService
topologyService, LogicalTopology logicalTopology) {
+ public CmgRaftService(TimeAwareRaftGroupService raftService,
TopologyService topologyService, LogicalTopology logicalTopology) {
this.raftService = raftService;
this.topologyService = topologyService;
this.logicalTopology = logicalTopology;
@@ -87,7 +88,7 @@ public class CmgRaftService implements ManuallyCloseable {
Peer leader = raftService.leader();
if (leader == null) {
- return raftService.refreshLeader().thenCompose(v ->
isCurrentNodeLeader());
+ return
raftService.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v
-> isCurrentNodeLeader());
} else {
String nodeName = topologyService.localMember().name();
@@ -101,7 +102,7 @@ public class CmgRaftService implements ManuallyCloseable {
* @return Future that resolves into the current cluster state or {@code
null} if it does not exist.
*/
public CompletableFuture<ClusterState> readClusterState() {
- return raftService.run(msgFactory.readStateCommand().build())
+ return run(msgFactory.readStateCommand().build())
.thenApply(ClusterState.class::cast);
}
@@ -114,7 +115,7 @@ public class CmgRaftService implements ManuallyCloseable {
public CompletableFuture<ClusterState> initClusterState(ClusterState
clusterState) {
ClusterNodeMessage localNodeMessage =
nodeMessage(topologyService.localMember());
- return
raftService.run(msgFactory.initCmgStateCommand().node(localNodeMessage).clusterState(clusterState).build())
+ return
run(msgFactory.initCmgStateCommand().node(localNodeMessage).clusterState(clusterState).build())
.thenApply(response -> {
if (response instanceof ValidationErrorResponse) {
throw new IllegalInitArgumentException("Init CMG
request denied, reason: "
@@ -146,7 +147,7 @@ public class CmgRaftService implements ManuallyCloseable {
// Using NO_TIMEOUT because we want a node that doesn't see CMG
majority at start to hang out until someone else starts; otherwise,
// if we employ a timeout here, node-by-node starts might cause
inability to form a cluster.
- return raftService.run(command, RaftCommandRunner.NO_TIMEOUT)
+ return run(command)
.thenAccept(response -> {
if (response instanceof ValidationErrorResponse) {
var validationErrorResponse =
(ValidationErrorResponse) response;
@@ -177,7 +178,7 @@ public class CmgRaftService implements ManuallyCloseable {
ClusterNodeMessage localNodeMessage =
nodeMessage(topologyService.localMember(), attributes);
JoinReadyCommand joinReadyCommand =
msgFactory.joinReadyCommand().node(localNodeMessage).build();
- return raftService.run(joinReadyCommand, RaftCommandRunner.NO_TIMEOUT)
+ return run(joinReadyCommand)
.thenAccept(response -> {
if (response instanceof ValidationErrorResponse) {
throw new JoinDeniedException("JoinReady request
denied, reason: "
@@ -200,16 +201,17 @@ public class CmgRaftService implements ManuallyCloseable {
.nodes(nodes.stream().map(this::nodeMessage).collect(toSet()))
.build();
- return raftService.run(command);
+ return run(command);
}
/**
* Retrieves the logical topology snapshot.
*
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
* @return Logical topology snapshot.
*/
- public CompletableFuture<LogicalTopologySnapshot> logicalTopology() {
- return raftService.run(msgFactory.readLogicalTopologyCommand().build())
+ public CompletableFuture<LogicalTopologySnapshot> logicalTopology(long
timeout) {
+ return run(msgFactory.readLogicalTopologyCommand().build(), timeout)
.thenApply(LogicalTopologyResponse.class::cast)
.thenApply(LogicalTopologyResponse::logicalTopology);
}
@@ -217,9 +219,19 @@ public class CmgRaftService implements ManuallyCloseable {
/**
* Returns a future that, when complete, resolves into a list of validated
nodes. This list includes all nodes currently present in the
* Logical Topology as well as nodes that only have passed the validation
step.
+ *
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
*/
- public CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
- return raftService.run(msgFactory.readValidatedNodesCommand().build());
+ public CompletableFuture<Set<InternalClusterNode>> validatedNodes(long
timeout) {
+ return run(msgFactory.readValidatedNodesCommand().build(), timeout);
+ }
+
+ private <R> CompletableFuture<R> run(Command cmd, long timeout) {
+ return raftService.run(cmd, timeout);
+ }
+
+ private <R> CompletableFuture<R> run(Command cmd) {
+ return run(cmd, TimeAwareRaftGroupService.NO_TIMEOUT);
}
/**
@@ -246,7 +258,7 @@ public class CmgRaftService implements ManuallyCloseable {
Peer leader = raftService.leader();
if (leader == null) {
- return raftService.refreshLeader().thenCompose(v -> majority());
+ return
raftService.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v
-> majority());
}
List<Peer> peers = raftService.peers();
@@ -297,7 +309,7 @@ public class CmgRaftService implements ManuallyCloseable {
List<Peer> currentLearners = raftService.learners();
if (currentLearners == null) {
- return raftService.refreshMembers(true).thenCompose(v ->
learners());
+ return raftService.refreshMembers(true,
TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v -> learners());
}
return completedFuture(currentLearners.stream()
@@ -316,7 +328,7 @@ public class CmgRaftService implements ManuallyCloseable {
List<Peer> currentLearners = raftService.learners();
if (currentLearners == null) {
- return raftService.refreshMembers(true).thenCompose(v ->
updateLearners(term));
+ return raftService.refreshMembers(true,
TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v -> updateLearners(term));
}
Set<String> currentLearnerNames = currentLearners.stream()
@@ -339,11 +351,11 @@ public class CmgRaftService implements ManuallyCloseable {
if (newLearners.isEmpty()) {
// Methods for working with learners do not support empty peer
lists for some reason.
// TODO: https://issues.apache.org/jira/browse/IGNITE-26855.
- return raftService.changePeersAndLearnersAsync(newConfiguration,
term, 0)
+ return raftService.changePeersAndLearnersAsync(newConfiguration,
term, 0, TimeAwareRaftGroupService.NO_TIMEOUT)
.thenRun(() ->
raftService.updateConfiguration(newConfiguration));
} else {
// TODO: https://issues.apache.org/jira/browse/IGNITE-26855.
- return raftService.resetLearners(newConfiguration.learners(), 0);
+ return raftService.resetLearners(newConfiguration.learners(), 0,
TimeAwareRaftGroupService.NO_TIMEOUT);
}
}
@@ -356,29 +368,37 @@ public class CmgRaftService implements ManuallyCloseable {
ChangeClusterNameCommand command =
msgFactory.changeClusterNameCommand()
.clusterName(clusterName)
.build();
- return raftService.run(command);
+ return run(command);
}
/**
* Changes Metastorage nodes.
*
+ * @param newMetastorageNodes New metastorage node names.
+ * @param metastorageRepairingConfigIndex Metastorage repairing config
index (for forceful reconfiguration).
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
* @return Future that completes when the change is finished.
*/
- public CompletableFuture<Void> changeMetastorageNodes(Set<String>
newMetastorageNodes, @Nullable Long metastorageRepairingConfigIndex) {
+ public CompletableFuture<Void> changeMetastorageNodes(
+ Set<String> newMetastorageNodes,
+ @Nullable Long metastorageRepairingConfigIndex,
+ long timeout
+ ) {
ChangeMetaStorageInfoCommand command =
msgFactory.changeMetaStorageInfoCommand()
.metaStorageNodes(Set.copyOf(newMetastorageNodes))
.metastorageRepairingConfigIndex(metastorageRepairingConfigIndex)
.build();
- return raftService.run(command);
+ return run(command, timeout);
}
/**
* Retrieves the Metastorage info.
*
- * @return Future that resolves into the metastorage info or {@code null}
if even cluster state does not exist.
+ * @param timeout Timeout in milliseconds. Use {@link
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+ * @return Future that resolves into the metastorage info or {@code null}
if cluster state does not exist.
*/
- public CompletableFuture<MetaStorageInfo> readMetaStorageInfo() {
- return raftService.run(msgFactory.readMetaStorageInfoCommand().build())
+ public CompletableFuture<MetaStorageInfo> readMetaStorageInfo(long
timeout) {
+ return run(msgFactory.readMetaStorageInfoCommand().build(), timeout)
.thenApply(MetaStorageInfo.class::cast);
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
new file mode 100644
index 00000000000..0bd35b54261
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory;
+import
org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+
+/**
+ * Factory for creating {@link PhysicalTopologyAwareRaftGroupService}
instances for CMG.
+ */
+public class PhysicalTopologyAwareRaftGroupServiceFactory implements
TimeAwareRaftGroupServiceFactory {
+
+ private final ClusterService clusterService;
+
+ private final RaftGroupEventsClientListener eventsClientListener;
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param clusterService Cluster service.
+ * @param eventsClientListener Raft events client listener.
+ * @param failureProcessor Failure processor.
+ */
+ public PhysicalTopologyAwareRaftGroupServiceFactory(
+ ClusterService clusterService,
+ RaftGroupEventsClientListener eventsClientListener,
+ FailureProcessor failureProcessor
+ ) {
+ this.clusterService = clusterService;
+ this.eventsClientListener = eventsClientListener;
+ this.failureProcessor = failureProcessor;
+ }
+
+ @Override
+ public PhysicalTopologyAwareRaftGroupService startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners peersAndLearners,
+ RaftConfiguration raftConfiguration,
+ ScheduledExecutorService raftClientExecutor,
+ Marshaller commandsMarshaller,
+ ExceptionFactory stoppingExceptionFactory
+ ) {
+ return PhysicalTopologyAwareRaftGroupService.start(
+ groupId,
+ clusterService,
+ raftConfiguration,
+ peersAndLearners,
+ raftClientExecutor,
+ eventsClientListener,
+ commandsMarshaller,
+ stoppingExceptionFactory,
+ failureProcessor
+ );
+ }
+}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index 1dc6948e1c0..e2397280155 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -48,11 +48,12 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import
org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -82,14 +83,15 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
@Mock LogicalTopology logicalTopology,
@Mock NodeAttributes nodeAttributes,
@Mock FailureManager failureManager,
- @Mock RaftGroupService raftGroupService,
- @Mock MetricManager metricManager
+ @Mock PhysicalTopologyAwareRaftGroupService raftGroupService,
+ @Mock MetricManager metricManager,
+ @Mock RaftGroupEventsClientListener eventsClientListener
) throws NodeStoppingException {
var addr = new NetworkAddress("localhost", 10_000);
clusterService = ClusterServiceTestUtils.clusterService(testInfo,
addr.port(), new StaticNodeFinder(List.of(addr)));
- when(raftManager.startSystemRaftGroupNodeAndWaitNodeReady(any(),
any(), any(), any(), any(), any()))
+
when(raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(any(),
any(), any(), any(), any(), any()))
.thenReturn(raftGroupService);
ClusterState clusterState = cmgMessagesFactory.clusterState()
@@ -99,12 +101,11 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
.version("foo")
.build();
- when(raftGroupService.run(any()))
- .thenReturn(nullCompletedFuture());
+ // General catch-all for timeout version must come before specific
matchers
when(raftGroupService.run(any(), anyLong()))
.thenReturn(nullCompletedFuture());
-
- when(raftGroupService.run(any(InitCmgStateCommand.class)))
+ // More specific matcher for InitCmgStateCommand - must come after
general matcher
+ when(raftGroupService.run(any(InitCmgStateCommand.class), anyLong()))
.thenReturn(completedFuture(clusterState));
cmgManager = new ClusterManagementGroupManager(
@@ -117,6 +118,7 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
logicalTopology,
nodeAttributes,
failureManager,
+ eventsClientListener,
new ClusterIdHolder(),
RaftGroupOptionsConfigurer.EMPTY,
metricManager
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
index 6ff0f01a128..33b9ced5421 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
@@ -34,8 +34,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
@@ -47,7 +46,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class CmgRaftServiceTest extends BaseIgniteAbstractTest {
@Mock
- private RaftGroupService raftGroupService;
+ private TimeAwareRaftGroupService raftGroupService;
@Mock
private TopologyService topologyService;
@@ -68,7 +67,7 @@ class CmgRaftServiceTest extends BaseIgniteAbstractTest {
assertThat(cmgRaftService.completeJoinCluster(new
EmptyNodeAttributes()), willCompleteSuccessfully());
- verify(raftGroupService).run(any(), eq(RaftCommandRunner.NO_TIMEOUT));
+ verify(raftGroupService).run(any(),
eq(TimeAwareRaftGroupService.NO_TIMEOUT));
}
private static class EmptyNodeAttributes implements NodeAttributes {
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 79883be747f..65fefe75e53 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.ReverseIterator;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.TestInfo;
/**
@@ -145,7 +146,15 @@ public class MockNode {
this.workDir.resolve("partitions/log")
);
- var raftManager = TestLozaFactory.create(clusterService,
raftConfiguration, systemLocalConfiguration, new HybridClockImpl());
+ var eventsClientListener = new RaftGroupEventsClientListener();
+
+ var raftManager = TestLozaFactory.create(
+ clusterService,
+ raftConfiguration,
+ systemLocalConfiguration,
+ new HybridClockImpl(),
+ eventsClientListener
+ );
var clusterStateStorage =
new
RocksDbClusterStateStorage(this.workDir.resolve("cmg/data"),
clusterService.nodeName());
@@ -179,6 +188,7 @@ public class MockNode {
new LogicalTopologyImpl(clusterStateStorage, failureManager),
collector,
failureManager,
+ eventsClientListener,
clusterIdHolder,
cmgRaftConfigurer,
new NoOpMetricManager(),
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index fc0e4728fa8..1804d6dc70d 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1317,6 +1317,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
clusterIdService,
cmgRaftConfigurer,
metricManager
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 5fed98e9214..034c552ddd9 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -197,6 +197,7 @@ abstract class ItMetaStorageMultipleNodesAbstractTest
extends IgniteAbstractTest
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
new ClusterIdHolder(),
cmgRaftConfigurator,
new NoOpMetricManager()
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index a2868e7655a..2ec4b15594d 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -215,6 +215,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
new ClusterIdHolder(),
cmgRaftConfigurer,
metricManager
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 65b9de944a1..55efc71c922 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -441,6 +441,7 @@ public class Node {
logicalTopology,
new NodeAttributesCollector(nodeAttributesConfiguration,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
clusterIdHolder,
cmgRaftConfigurer,
metricManager
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index 595ffccb24b..16ddb37eec7 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.jetbrains.annotations.Nullable;
@@ -83,6 +84,31 @@ public interface RaftManager extends IgniteComponent {
RaftGroupOptionsConfigurer groupOptionsConfigurer
) throws NodeStoppingException;
+ /**
+ * Starts a Raft group and a time-aware Raft service on the current node,
using the given service factory.
+ *
+ * <p>Synchronously waits for the Raft log to be applied.
+ *
+ * <p>This method is similar to {@link
#startSystemRaftGroupNodeAndWaitNodeReady} but creates a
+ * {@link TimeAwareRaftGroupService} instead of a {@link RaftGroupService}.
+ *
+ * @param nodeId Raft node ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param lsnr Raft group listener.
+ * @param eventsLsnr Raft group events listener.
+ * @param factory Service factory for creating time-aware Raft group
service.
+ * @param groupOptionsConfigurer Configures raft log and snapshot storages.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ TimeAwareRaftGroupService
startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ TimeAwareRaftGroupServiceFactory factory,
+ RaftGroupOptionsConfigurer groupOptionsConfigurer
+ ) throws NodeStoppingException;
+
/**
* Stops a given local Raft node.
*
@@ -141,6 +167,28 @@ public interface RaftManager extends IgniteComponent {
boolean isSystemGroup
) throws NodeStoppingException;
+ /**
+ * Creates a time-aware Raft group service providing operations on a Raft
group, using the given factory.
+ *
+ * <p>This method is similar to {@link
#startRaftGroupService(ReplicationGroupId, PeersAndLearners,
RaftServiceFactory, Marshaller,
+ * ExceptionFactory, boolean)} but creates a {@link
TimeAwareRaftGroupService} instead of a {@link RaftGroupService}.
+ *
+ * @param groupId Raft group ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param factory Factory that should be used to create raft service.
+ * @param stoppingExceptionFactory Exception factory used to create
exceptions thrown to indicate that the object is being stopped.
+ * @param isSystemGroup Whether the group service is for a system group or
not.
+ * @return Time-aware Raft group service.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ TimeAwareRaftGroupService startTimeAwareRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners configuration,
+ TimeAwareRaftGroupServiceFactory factory,
+ ExceptionFactory stoppingExceptionFactory,
+ boolean isSystemGroup
+ ) throws NodeStoppingException;
+
/**
* Destroys Raft group node storages (log storage, metadata storage and
snapshots storage).
*
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
new file mode 100644
index 00000000000..5aaf3273896
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Factory for creating {@link TimeAwareRaftGroupService} instances.
+ *
+ * <p>This factory is similar to {@link RaftServiceFactory} but creates
services that implement
+ * {@link TimeAwareRaftGroupService} interface instead of requiring the
service to extend
+ * {@link org.apache.ignite.internal.raft.service.RaftGroupService}.
+ */
+@FunctionalInterface
+public interface TimeAwareRaftGroupServiceFactory {
+ /**
+ * Creates a time-aware Raft group service.
+ *
+ * @param groupId Group id.
+ * @param peersAndLearners Peers configuration.
+ * @param raftConfiguration Raft configuration.
+ * @param raftClientExecutor Client executor.
+ * @param commandsMarshaller Marshaller that should be used to serialize
commands.
+ * @param stoppingExceptionFactory Exception factory used to create
exceptions thrown to indicate that the object is being stopped.
+ * @return New time-aware Raft client.
+ */
+ TimeAwareRaftGroupService startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners peersAndLearners,
+ RaftConfiguration raftConfiguration,
+ ScheduledExecutorService raftClientExecutor,
+ Marshaller commandsMarshaller,
+ ExceptionFactory stoppingExceptionFactory
+ );
+}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
index 25c0e3f8063..b1059092bb4 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
@@ -52,6 +52,8 @@ import org.jetbrains.annotations.Nullable;
* <p>All async operations provided by the service are not cancellable.
*/
public interface TimeAwareRaftGroupService {
+ /** Constant meaning that an operation will never timeout. */
+ long NO_TIMEOUT = Long.MAX_VALUE;
/**
* Runs a command on a replication group leader with the given timeout.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 4f3fcb90c1f..e779040a050 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.raft.server.impl.GroupStoragesContextResolver;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import
org.apache.ignite.internal.raft.storage.impl.NoopGroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.storage.impl.StorageDestructionIntent;
@@ -365,6 +366,50 @@ public class Loza implements RaftManager {
}
}
+ @Override
+ public TimeAwareRaftGroupService
startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ TimeAwareRaftGroupServiceFactory factory,
+ RaftGroupOptionsConfigurer groupOptionsConfigurer
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ RaftGroupOptions raftGroupOptions = RaftGroupOptions.defaults()
+ .setSystemGroup(true);
+
+ groupOptionsConfigurer.configure(raftGroupOptions);
+
+ // Start the raft node first (server side).
+ startRaftGroupNodeInternal(
+ nodeId,
+ configuration,
+ lsnr,
+ eventsLsnr,
+ raftGroupOptions,
+ null,
+ StoppingExceptionFactories.indicateNodeStop()
+ );
+
+ // Create the time-aware service using the provided factory.
+ return factory.startRaftGroupService(
+ nodeId.groupId(),
+ configuration,
+ raftConfiguration,
+ executor,
+ opts.getCommandsMarshaller(),
+ StoppingExceptionFactories.indicateNodeStop()
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
@Override
public RaftGroupService startRaftGroupService(ReplicationGroupId groupId,
PeersAndLearners configuration, boolean isSystemGroup)
throws NodeStoppingException {
@@ -422,6 +467,32 @@ public class Loza implements RaftManager {
}
}
+ @Override
+ public TimeAwareRaftGroupService startTimeAwareRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners configuration,
+ TimeAwareRaftGroupServiceFactory factory,
+ ExceptionFactory stoppingExceptionFactory,
+ boolean isSystemGroup
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ return factory.startRaftGroupService(
+ groupId,
+ configuration,
+ raftConfiguration,
+ executor,
+ opts.getCommandsMarshaller(),
+ stoppingExceptionFactory
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
@Override
public void destroyRaftNodeStorages(RaftNodeId nodeId,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer)
throws NodeStoppingException {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
index 7c7ec1a6434..6efa0bb7929 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
@@ -84,8 +84,8 @@ public class PhysicalTopologyAwareRaftGroupService implements
TimeAwareRaftGroup
/** General leader election listener. */
private final ServerEventHandler generalLeaderElectionListener;
- /** Failure manager. */
- private final FailureManager failureManager;
+ /** Failure processor. */
+ private final FailureProcessor failureProcessor;
/** Cluster service. */
private final ClusterService clusterService;
@@ -117,7 +117,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
/**
* Constructor.
*
- * @param failureManager Failure manager.
+ * @param failureProcessor Failure processor.
* @param clusterService Cluster service.
* @param executor Executor to invoke RPC requests and notify listeners.
* @param raftConfiguration RAFT configuration.
@@ -125,7 +125,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
*/
private PhysicalTopologyAwareRaftGroupService(
ReplicationGroupId groupId,
- FailureManager failureManager,
+ FailureProcessor failureProcessor,
ClusterService clusterService,
ScheduledExecutorService executor,
RaftConfiguration raftConfiguration,
@@ -135,7 +135,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
RaftGroupEventsClientListener eventsClientListener
) {
this.groupId = groupId;
- this.failureManager = failureManager;
+ this.failureProcessor = failureProcessor;
this.clusterService = clusterService;
this.stoppingExceptionFactory = stoppingExceptionFactory;
this.peers = List.copyOf(configuration.peers());
@@ -247,7 +247,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
return subscriptionMessageSender.send(member,
msg).whenComplete((isSent, err) -> {
if (err != null) {
- failureManager.process(new FailureContext(err, "Could not
change subscription to leader updates [grp="
+ failureProcessor.process(new FailureContext(err, "Could
not change subscription to leader updates [grp="
+ groupId() + "]."));
}
@@ -270,7 +270,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
* @param eventsClientListener Listener for RAFT group events.
* @param cmdMarshaller Marshaller for RAFT commands.
* @param stoppingExceptionFactory Factory for creating stopping
exceptions.
- * @param failureManager Manager for handling failures.
+ * @param failureProcessor Processor for handling failures.
* @return A new instance of PhysicalTopologyAwareRaftGroupService.
*/
public static PhysicalTopologyAwareRaftGroupService start(
@@ -282,11 +282,11 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
RaftGroupEventsClientListener eventsClientListener,
Marshaller cmdMarshaller,
ExceptionFactory stoppingExceptionFactory,
- FailureManager failureManager
+ FailureProcessor failureProcessor
) {
return new PhysicalTopologyAwareRaftGroupService(
groupId,
- failureManager,
+ failureProcessor,
cluster,
executor,
raftConfiguration,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index e52804085a9..a3362dd088c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -220,6 +220,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
new ClusterIdHolder(),
cmgRaftConfigurer,
metricManager
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 96f6143aea7..383739ea49c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -194,6 +194,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureManager,
+ raftGroupEventsClientListener,
new ClusterIdHolder(),
cmgRaftConfigurer,
metricManager
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7eff453dd2a..6d136c98722 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -468,6 +468,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
logicalTopology,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
failureProcessor,
+ raftGroupEventsClientListener,
clusterIdService,
cmgRaftConfigurer,
metricManager
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 59d56e84289..a3ccf925804 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -769,6 +769,7 @@ public class IgniteImpl implements Ignite {
validationManager,
nodeAttributesCollector,
failureManager,
+ raftGroupEventsClientListener,
clusterIdService,
cmgRaftConfigurer,
metricManager