This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fc56a61e82 IGNITE-27157 Adapt CMG services to use timeout (#7509)
5fc56a61e82 is described below

commit 5fc56a61e82f5de84ebdc8692797d854f7ec6cb4
Author: Cyrill <[email protected]>
AuthorDate: Tue Mar 17 13:22:47 2026 +0300

    IGNITE-27157 Adapt CMG services to use timeout (#7509)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 modules/cluster-management/build.gradle            |   1 +
 .../management/raft/ItCmgRaftServiceTest.java      |  48 +++++++---
 .../management/ClusterManagementGroupManager.java  | 104 ++++++++++++++++++---
 .../cluster/management/raft/CmgRaftService.java    |  72 ++++++++------
 ...ysicalTopologyAwareRaftGroupServiceFactory.java |  81 ++++++++++++++++
 .../ClusterManagementGroupManagerTest.java         |  18 ++--
 .../management/raft/CmgRaftServiceTest.java        |   7 +-
 .../internal/cluster/management/MockNode.java      |  12 ++-
 .../rebalance/ItRebalanceDistributedTest.java      |   1 +
 .../ItMetaStorageMultipleNodesAbstractTest.java    |   1 +
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   1 +
 .../partition/replicator/fixtures/Node.java        |   1 +
 .../apache/ignite/internal/raft/RaftManager.java   |  48 ++++++++++
 .../raft/TimeAwareRaftGroupServiceFactory.java     |  53 +++++++++++
 .../raft/service/TimeAwareRaftGroupService.java    |   2 +
 .../java/org/apache/ignite/internal/raft/Loza.java |  71 ++++++++++++++
 .../PhysicalTopologyAwareRaftGroupService.java     |  20 ++--
 .../ItDistributedConfigurationPropertiesTest.java  |   1 +
 .../ItDistributedConfigurationStorageTest.java     |   1 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 21 files changed, 471 insertions(+), 74 deletions(-)

diff --git a/modules/cluster-management/build.gradle 
b/modules/cluster-management/build.gradle
index b1ccf334640..74f56eab774 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -36,6 +36,7 @@ dependencies {
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-network')
     implementation project(':ignite-raft-api')
+    implementation project(':ignite-raft')
     implementation project(':ignite-vault')
     implementation project(':ignite-rocksdb-common')
     implementation project(':ignite-storage-api')
diff --git 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index e4c0d086467..f4efbce851f 100644
--- 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -71,9 +71,10 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.StoppingExceptionFactories;
 import org.apache.ignite.internal.raft.TestLozaFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.apache.ignite.internal.raft.storage.LogStorageManager;
 import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -81,6 +82,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -116,6 +118,10 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
 
         private final ComponentWorkingDir workingDir;
 
+        private final RaftGroupEventsClientListener eventsClientListener;
+
+        private final NoOpFailureManager failureManager;
+
         Node(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder, 
Path workDir) {
             this.clusterService = clusterService(testInfo, addr.port(), 
nodeFinder);
             workingDir = new ComponentWorkingDir(workDir);
@@ -124,8 +130,16 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
                     clusterService.nodeName(),
                     workingDir.raftLogPath()
             );
-            this.raftManager = TestLozaFactory.create(clusterService, 
raftConfiguration, systemLocalConfiguration, new HybridClockImpl());
-            this.logicalTopology = new 
LogicalTopologyImpl(clusterStateStorage, new NoOpFailureManager());
+            this.eventsClientListener = new RaftGroupEventsClientListener();
+            this.failureManager = new NoOpFailureManager();
+            this.raftManager = TestLozaFactory.create(
+                    clusterService,
+                    raftConfiguration,
+                    systemLocalConfiguration,
+                    new HybridClockImpl(),
+                    eventsClientListener
+            );
+            this.logicalTopology = new 
LogicalTopologyImpl(clusterStateStorage, failureManager);
         }
 
         void start() {
@@ -147,14 +161,26 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
 
                 Peer serverPeer = configuration.peer(localMember().name());
 
-                RaftGroupService raftService;
+                var raftServiceFactory = new 
PhysicalTopologyAwareRaftGroupServiceFactory(
+                        clusterService,
+                        eventsClientListener,
+                        failureManager
+                );
+
+                TimeAwareRaftGroupService raftService;
 
                 if (serverPeer == null) {
-                    raftService = 
raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration, true);
+                    raftService = raftManager.startTimeAwareRaftGroupService(
+                            CmgGroupId.INSTANCE,
+                            configuration,
+                            raftServiceFactory,
+                            StoppingExceptionFactories.indicateComponentStop(),
+                            true
+                    );
                 } else {
                     var clusterStateStorageMgr = new 
ClusterStateStorageManager(clusterStateStorage);
 
-                    raftService = 
raftManager.startSystemRaftGroupNodeAndWaitNodeReady(
+                    raftService = 
raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
                             new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
                             configuration,
                             new CmgRaftGroupListener(
@@ -163,11 +189,11 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
                                     new 
ValidationManager(clusterStateStorageMgr, logicalTopology),
                                     term -> {},
                                     new ClusterIdHolder(),
-                                    new NoOpFailureManager(),
+                                    failureManager,
                                     config -> {}
                             ),
                             RaftGroupEventsListener.noopLsnr,
-                            null,
+                            raftServiceFactory,
                             
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageManager, 
workingDir.metaPath())
                     );
                 }
@@ -197,11 +223,11 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
         }
 
         private CompletableFuture<Set<LogicalNode>> logicalTopologyNodes() {
-            return 
raftService.logicalTopology().thenApply(LogicalTopologySnapshot::nodes);
+            return 
raftService.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(LogicalTopologySnapshot::nodes);
         }
 
         private CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
-            return raftService.validatedNodes();
+            return 
raftService.validatedNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
         }
     }
 
@@ -229,7 +255,7 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
     }
 
     /**
-     * Tests the basic scenario of {@link CmgRaftService#logicalTopology()} 
when nodes are joining and leaving.
+     * Tests the basic scenario of {@link 
CmgRaftService#logicalTopology(long)} when nodes are joining and leaving.
      */
     @Test
     void testLogicalTopology() {
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 45a0b6ecf31..26195017d9a 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -66,6 +66,7 @@ import 
org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
 import org.apache.ignite.internal.cluster.management.raft.CmgRaftService;
 import 
org.apache.ignite.internal.cluster.management.raft.IllegalInitArgumentException;
 import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
+import 
org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.cluster.management.raft.ValidationManager;
 import 
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
@@ -98,13 +99,14 @@ import 
org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -173,6 +175,9 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     /** Failure processor that is used to handle critical errors. */
     private final FailureProcessor failureProcessor;
 
+    /** Raft group events client listener for receiving leader election 
notifications. */
+    private final RaftGroupEventsClientListener eventsClientListener;
+
     private final ClusterIdStore clusterIdStore;
 
     private final ClusterResetStorage clusterResetStorage;
@@ -204,6 +209,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             ValidationManager validationManager,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
+            RaftGroupEventsClientListener eventsClientListener,
             ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager
@@ -219,6 +225,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 validationManager,
                 nodeAttributes,
                 failureProcessor,
+                eventsClientListener,
                 clusterIdStore,
                 raftGroupOptionsConfigurer,
                 metricManager,
@@ -238,6 +245,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             ValidationManager validationManager,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
+            RaftGroupEventsClientListener eventsClientListener,
             ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager,
@@ -253,6 +261,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         this.localStateStorage = new LocalStateStorage(vault);
         this.nodeAttributes = nodeAttributes;
         this.failureProcessor = failureProcessor;
+        this.eventsClientListener = eventsClientListener;
         this.clusterIdStore = clusterIdStore;
         this.raftGroupOptionsConfigurer = raftGroupOptionsConfigurer;
         this.metricsManager = metricManager;
@@ -336,6 +345,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             LogicalTopology logicalTopology,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
+            RaftGroupEventsClientListener eventsClientListener,
             ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager
@@ -351,6 +361,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 new ValidationManager(new 
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
                 nodeAttributes,
                 failureProcessor,
+                eventsClientListener,
                 clusterIdStore,
                 raftGroupOptionsConfigurer,
                 metricManager
@@ -369,6 +380,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             LogicalTopology logicalTopology,
             NodeAttributes nodeAttributes,
             FailureProcessor failureProcessor,
+            RaftGroupEventsClientListener eventsClientListener,
             ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager,
@@ -385,6 +397,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 new ValidationManager(new 
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
                 nodeAttributes,
                 failureProcessor,
+                eventsClientListener,
                 clusterIdStore,
                 raftGroupOptionsConfigurer,
                 metricManager,
@@ -804,7 +817,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * cluster state.
      */
     private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
-        return service.logicalTopology()
+        return service.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT)
                 .thenCompose(logicalTopology -> inBusyLock(() -> {
                     Set<UUID> physicalTopologyIds = 
clusterService.topologyService().allMembers()
                             .stream()
@@ -987,7 +1000,13 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
             LOG.info("Starting CMG Raft service [isLearner={}, nodeNames={}, 
serverPeer={}]", isLearner, nodeNames, serverPeer);
 
-            RaftGroupService service = 
raftManager.startSystemRaftGroupNodeAndWaitNodeReady(
+            var raftServiceFactory = new 
PhysicalTopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    eventsClientListener,
+                    failureProcessor
+            );
+
+            TimeAwareRaftGroupService raftGroupService = 
raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
                     raftNodeId(serverPeer),
                     raftConfiguration,
                     new CmgRaftGroupListener(
@@ -1002,11 +1021,11 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                     (term, configurationTerm, configurationIndex, 
configuration, sequenceToken) -> {
                         onElectedAsLeader(term);
                     },
-                    null,
+                    raftServiceFactory,
                     raftGroupOptionsConfigurer
             );
 
-            return new CmgRaftService(service, 
clusterService.topologyService(), logicalTopology);
+            return new CmgRaftService(raftGroupService, 
clusterService.topologyService(), logicalTopology);
         } catch (NodeStoppingException e) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, e);
         } finally {
@@ -1101,7 +1120,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     }
 
     private void sendClusterState(CmgRaftService raftService, 
Collection<InternalClusterNode> nodes) {
-        raftService.logicalTopology()
+        raftService.logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT)
                 .thenCompose(topology -> {
                     // TODO https://issues.apache.org/jira/browse/IGNITE-24769
                     Set<InternalClusterNode> logicalTopology = 
topology.nodes().stream()
@@ -1245,7 +1264,17 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * @return Future that, when complete, resolves into a list of node names 
that host the Meta Storage.
      */
     public CompletableFuture<Set<String>> metaStorageNodes() {
-        return metaStorageInfo()
+        return metaStorageNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
+    }
+
+    /**
+     * Returns a future that, when complete, resolves into a list of node 
names that host the Meta Storage.
+     *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     * @return Future that, when complete, resolves into a list of node names 
that host the Meta Storage.
+     */
+    public CompletableFuture<Set<String>> metaStorageNodes(long timeout) {
+        return metaStorageInfo(timeout)
                 .thenApply(MetaStorageInfo::metaStorageNodes);
     }
 
@@ -1253,13 +1282,22 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * Returns a future that, when complete, resolves into a Meta storage info.
      */
     public CompletableFuture<MetaStorageInfo> metaStorageInfo() {
+        return metaStorageInfo(TimeAwareRaftGroupService.NO_TIMEOUT);
+    }
+
+    /**
+     * Returns a future that, when complete, resolves into a Meta storage info.
+     *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     */
+    public CompletableFuture<MetaStorageInfo> metaStorageInfo(long timeout) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
 
         try {
             return raftServiceAfterJoin()
-                    .thenCompose(CmgRaftService::readMetaStorageInfo);
+                    .thenCompose(service -> 
service.readMetaStorageInfo(timeout));
         } finally {
             busyLock.leaveBusy();
         }
@@ -1308,12 +1346,22 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * @return Future that, when complete, resolves into a logical topology 
snapshot.
      */
     public CompletableFuture<LogicalTopologySnapshot> logicalTopology() {
+        return logicalTopology(TimeAwareRaftGroupService.NO_TIMEOUT);
+    }
+
+    /**
+     * Returns a future that, when complete, resolves into a logical topology 
snapshot.
+     *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     * @return Future that, when complete, resolves into a logical topology 
snapshot.
+     */
+    public CompletableFuture<LogicalTopologySnapshot> logicalTopology(long 
timeout) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
 
         try {
-            return 
raftServiceAfterJoin().thenCompose(CmgRaftService::logicalTopology);
+            return raftServiceAfterJoin().thenCompose(service -> 
service.logicalTopology(timeout));
         } finally {
             busyLock.leaveBusy();
         }
@@ -1324,12 +1372,22 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * Logical Topology as well as nodes that only have passed the validation 
step.
      */
     public CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
+        return validatedNodes(TimeAwareRaftGroupService.NO_TIMEOUT);
+    }
+
+    /**
+     * Returns a future that, when complete, resolves into a list of validated 
nodes. This list includes all nodes currently present in the
+     * Logical Topology as well as nodes that only have passed the validation 
step.
+     *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     */
+    public CompletableFuture<Set<InternalClusterNode>> validatedNodes(long 
timeout) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
 
         try {
-            return 
raftServiceAfterJoin().thenCompose(CmgRaftService::validatedNodes);
+            return raftServiceAfterJoin().thenCompose(service -> 
service.validatedNodes(timeout));
         } finally {
             busyLock.leaveBusy();
         }
@@ -1360,7 +1418,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * @return Future that completes when the command is executed by the CMG.
      */
     public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes) {
-        return changeMetastorageNodesInternal(newMetastorageNodes, null);
+        return changeMetastorageNodesInternal(newMetastorageNodes, null, 
TimeAwareRaftGroupService.NO_TIMEOUT);
     }
 
     /**
@@ -1372,12 +1430,30 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      * @return Future that completes when the command is executed by the CMG.
      */
     public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes, long metastorageRepairingConfigIndex) {
-        return changeMetastorageNodesInternal(newMetastorageNodes, 
metastorageRepairingConfigIndex);
+        return changeMetastorageNodesInternal(newMetastorageNodes, 
metastorageRepairingConfigIndex, TimeAwareRaftGroupService.NO_TIMEOUT);
+    }
+
+    /**
+     * Changes metastorage nodes in the CMG for the forceful (with repair) 
Metastorage reconfiguration procedure.
+     *
+     * @param newMetastorageNodes Metastorage node names to set.
+     * @param metastorageRepairingConfigIndex Raft index in the Metastorage 
group under which the forced configuration is
+     *     (or will be) saved.
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     * @return Future that completes when the command is executed by the CMG.
+     */
+    public CompletableFuture<Void> changeMetastorageNodes(
+            Set<String> newMetastorageNodes,
+            long metastorageRepairingConfigIndex,
+            long timeout
+    ) {
+        return changeMetastorageNodesInternal(newMetastorageNodes, 
metastorageRepairingConfigIndex, timeout);
     }
 
     private CompletableFuture<Void> changeMetastorageNodesInternal(
             Set<String> newMetastorageNodes,
-            @Nullable Long metastorageRepairingConfigIndex
+            @Nullable Long metastorageRepairingConfigIndex,
+            long timeout
     ) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
@@ -1385,7 +1461,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
         try {
             return raftServiceAfterJoin()
-                    .thenCompose(service -> 
service.changeMetastorageNodes(newMetastorageNodes, 
metastorageRepairingConfigIndex));
+                    .thenCompose(service -> 
service.changeMetastorageNodes(newMetastorageNodes, 
metastorageRepairingConfigIndex, timeout));
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index acd0c1e042d..78dec96585f 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -49,21 +49,22 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A wrapper around a {@link RaftGroupService} providing helpful methods for 
working with the CMG.
+ * A wrapper around a {@link TimeAwareRaftGroupService} providing helpful 
methods for working with the CMG.
  */
 public class CmgRaftService implements ManuallyCloseable {
     private static final IgniteLogger LOG = 
Loggers.forClass(CmgRaftService.class);
 
     private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
 
-    private final RaftGroupService raftService;
+    private final TimeAwareRaftGroupService raftService;
 
     private final TopologyService topologyService;
 
@@ -72,7 +73,7 @@ public class CmgRaftService implements ManuallyCloseable {
     /**
      * Creates a new instance.
      */
-    public CmgRaftService(RaftGroupService raftService, TopologyService 
topologyService, LogicalTopology logicalTopology) {
+    public CmgRaftService(TimeAwareRaftGroupService raftService, 
TopologyService topologyService, LogicalTopology logicalTopology) {
         this.raftService = raftService;
         this.topologyService = topologyService;
         this.logicalTopology = logicalTopology;
@@ -87,7 +88,7 @@ public class CmgRaftService implements ManuallyCloseable {
         Peer leader = raftService.leader();
 
         if (leader == null) {
-            return raftService.refreshLeader().thenCompose(v -> 
isCurrentNodeLeader());
+            return 
raftService.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v 
-> isCurrentNodeLeader());
         } else {
             String nodeName = topologyService.localMember().name();
 
@@ -101,7 +102,7 @@ public class CmgRaftService implements ManuallyCloseable {
      * @return Future that resolves into the current cluster state or {@code 
null} if it does not exist.
      */
     public CompletableFuture<ClusterState> readClusterState() {
-        return raftService.run(msgFactory.readStateCommand().build())
+        return run(msgFactory.readStateCommand().build())
                 .thenApply(ClusterState.class::cast);
     }
 
@@ -114,7 +115,7 @@ public class CmgRaftService implements ManuallyCloseable {
     public CompletableFuture<ClusterState> initClusterState(ClusterState 
clusterState) {
         ClusterNodeMessage localNodeMessage = 
nodeMessage(topologyService.localMember());
 
-        return 
raftService.run(msgFactory.initCmgStateCommand().node(localNodeMessage).clusterState(clusterState).build())
+        return 
run(msgFactory.initCmgStateCommand().node(localNodeMessage).clusterState(clusterState).build())
                 .thenApply(response -> {
                     if (response instanceof ValidationErrorResponse) {
                         throw new IllegalInitArgumentException("Init CMG 
request denied, reason: "
@@ -146,7 +147,7 @@ public class CmgRaftService implements ManuallyCloseable {
 
         // Using NO_TIMEOUT because we want a node that doesn't see CMG 
majority at start to hang out until someone else starts; otherwise,
         // if we employ a timeout here, node-by-node starts might cause 
inability to form a cluster.
-        return raftService.run(command, RaftCommandRunner.NO_TIMEOUT)
+        return run(command)
                 .thenAccept(response -> {
                     if (response instanceof ValidationErrorResponse) {
                         var validationErrorResponse = 
(ValidationErrorResponse) response;
@@ -177,7 +178,7 @@ public class CmgRaftService implements ManuallyCloseable {
         ClusterNodeMessage localNodeMessage = 
nodeMessage(topologyService.localMember(), attributes);
 
         JoinReadyCommand joinReadyCommand = 
msgFactory.joinReadyCommand().node(localNodeMessage).build();
-        return raftService.run(joinReadyCommand, RaftCommandRunner.NO_TIMEOUT)
+        return run(joinReadyCommand)
                 .thenAccept(response -> {
                     if (response instanceof ValidationErrorResponse) {
                         throw new JoinDeniedException("JoinReady request 
denied, reason: "
@@ -200,16 +201,17 @@ public class CmgRaftService implements ManuallyCloseable {
                 .nodes(nodes.stream().map(this::nodeMessage).collect(toSet()))
                 .build();
 
-        return raftService.run(command);
+        return run(command);
     }
 
     /**
      * Retrieves the logical topology snapshot.
      *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
      * @return Logical topology snapshot.
      */
-    public CompletableFuture<LogicalTopologySnapshot> logicalTopology() {
-        return raftService.run(msgFactory.readLogicalTopologyCommand().build())
+    public CompletableFuture<LogicalTopologySnapshot> logicalTopology(long 
timeout) {
+        return run(msgFactory.readLogicalTopologyCommand().build(), timeout)
                 .thenApply(LogicalTopologyResponse.class::cast)
                 .thenApply(LogicalTopologyResponse::logicalTopology);
     }
@@ -217,9 +219,19 @@ public class CmgRaftService implements ManuallyCloseable {
     /**
      * Returns a future that, when complete, resolves into a list of validated 
nodes. This list includes all nodes currently present in the
      * Logical Topology as well as nodes that only have passed the validation 
step.
+     *
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
      */
-    public CompletableFuture<Set<InternalClusterNode>> validatedNodes() {
-        return raftService.run(msgFactory.readValidatedNodesCommand().build());
+    public CompletableFuture<Set<InternalClusterNode>> validatedNodes(long 
timeout) {
+        return run(msgFactory.readValidatedNodesCommand().build(), timeout);
+    }
+
+    private <R> CompletableFuture<R> run(Command cmd, long timeout) {
+        return raftService.run(cmd, timeout);
+    }
+
+    private <R> CompletableFuture<R> run(Command cmd) {
+        return run(cmd, TimeAwareRaftGroupService.NO_TIMEOUT);
     }
 
     /**
@@ -246,7 +258,7 @@ public class CmgRaftService implements ManuallyCloseable {
         Peer leader = raftService.leader();
 
         if (leader == null) {
-            return raftService.refreshLeader().thenCompose(v -> majority());
+            return 
raftService.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v 
-> majority());
         }
 
         List<Peer> peers = raftService.peers();
@@ -297,7 +309,7 @@ public class CmgRaftService implements ManuallyCloseable {
         List<Peer> currentLearners = raftService.learners();
 
         if (currentLearners == null) {
-            return raftService.refreshMembers(true).thenCompose(v -> 
learners());
+            return raftService.refreshMembers(true, 
TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v -> learners());
         }
 
         return completedFuture(currentLearners.stream()
@@ -316,7 +328,7 @@ public class CmgRaftService implements ManuallyCloseable {
         List<Peer> currentLearners = raftService.learners();
 
         if (currentLearners == null) {
-            return raftService.refreshMembers(true).thenCompose(v -> 
updateLearners(term));
+            return raftService.refreshMembers(true, 
TimeAwareRaftGroupService.NO_TIMEOUT).thenCompose(v -> updateLearners(term));
         }
 
         Set<String> currentLearnerNames = currentLearners.stream()
@@ -339,11 +351,11 @@ public class CmgRaftService implements ManuallyCloseable {
         if (newLearners.isEmpty()) {
             // Methods for working with learners do not support empty peer 
lists for some reason.
             // TODO: https://issues.apache.org/jira/browse/IGNITE-26855.
-            return raftService.changePeersAndLearnersAsync(newConfiguration, 
term,  0)
+            return raftService.changePeersAndLearnersAsync(newConfiguration, 
term,  0, TimeAwareRaftGroupService.NO_TIMEOUT)
                     .thenRun(() -> 
raftService.updateConfiguration(newConfiguration));
         } else {
             // TODO: https://issues.apache.org/jira/browse/IGNITE-26855.
-            return raftService.resetLearners(newConfiguration.learners(), 0);
+            return raftService.resetLearners(newConfiguration.learners(), 0, 
TimeAwareRaftGroupService.NO_TIMEOUT);
         }
     }
 
@@ -356,29 +368,37 @@ public class CmgRaftService implements ManuallyCloseable {
         ChangeClusterNameCommand command = 
msgFactory.changeClusterNameCommand()
                 .clusterName(clusterName)
                 .build();
-        return raftService.run(command);
+        return run(command);
     }
 
     /**
      * Changes Metastorage nodes.
      *
+     * @param newMetastorageNodes New metastorage node names.
+     * @param metastorageRepairingConfigIndex Metastorage repairing config 
index (for forceful reconfiguration).
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
      * @return Future that completes when the change is finished.
      */
-    public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes, @Nullable Long metastorageRepairingConfigIndex) {
+    public CompletableFuture<Void> changeMetastorageNodes(
+            Set<String> newMetastorageNodes,
+            @Nullable Long metastorageRepairingConfigIndex,
+            long timeout
+    ) {
         ChangeMetaStorageInfoCommand command = 
msgFactory.changeMetaStorageInfoCommand()
                 .metaStorageNodes(Set.copyOf(newMetastorageNodes))
                 
.metastorageRepairingConfigIndex(metastorageRepairingConfigIndex)
                 .build();
-        return raftService.run(command);
+        return run(command, timeout);
     }
 
     /**
      * Retrieves the Metastorage info.
      *
-     * @return Future that resolves into the metastorage info or {@code null} 
if even cluster state does not exist.
+     * @param timeout Timeout in milliseconds. Use {@link 
TimeAwareRaftGroupService#NO_TIMEOUT} for infinite wait.
+     * @return Future that resolves into the metastorage info or {@code null} 
if cluster state does not exist.
      */
-    public CompletableFuture<MetaStorageInfo> readMetaStorageInfo() {
-        return raftService.run(msgFactory.readMetaStorageInfoCommand().build())
+    public CompletableFuture<MetaStorageInfo> readMetaStorageInfo(long 
timeout) {
+        return run(msgFactory.readMetaStorageInfoCommand().build(), timeout)
                 .thenApply(MetaStorageInfo.class::cast);
     }
 
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
new file mode 100644
index 00000000000..0bd35b54261
--- /dev/null
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/PhysicalTopologyAwareRaftGroupServiceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory;
+import 
org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+
+/**
+ * Factory for creating {@link PhysicalTopologyAwareRaftGroupService} 
instances for CMG.
+ */
+public class PhysicalTopologyAwareRaftGroupServiceFactory implements 
TimeAwareRaftGroupServiceFactory {
+
+    private final ClusterService clusterService;
+
+    private final RaftGroupEventsClientListener eventsClientListener;
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param eventsClientListener Raft events client listener.
+     * @param failureProcessor Failure processor.
+     */
+    public PhysicalTopologyAwareRaftGroupServiceFactory(
+            ClusterService clusterService,
+            RaftGroupEventsClientListener eventsClientListener,
+            FailureProcessor failureProcessor
+    ) {
+        this.clusterService = clusterService;
+        this.eventsClientListener = eventsClientListener;
+        this.failureProcessor = failureProcessor;
+    }
+
+    @Override
+    public PhysicalTopologyAwareRaftGroupService startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners peersAndLearners,
+            RaftConfiguration raftConfiguration,
+            ScheduledExecutorService raftClientExecutor,
+            Marshaller commandsMarshaller,
+            ExceptionFactory stoppingExceptionFactory
+    ) {
+        return PhysicalTopologyAwareRaftGroupService.start(
+                groupId,
+                clusterService,
+                raftConfiguration,
+                peersAndLearners,
+                raftClientExecutor,
+                eventsClientListener,
+                commandsMarshaller,
+                stoppingExceptionFactory,
+                failureProcessor
+        );
+    }
+}
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index 1dc6948e1c0..e2397280155 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -48,11 +48,12 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import org.apache.ignite.internal.raft.RaftManager;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import 
org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -82,14 +83,15 @@ class ClusterManagementGroupManagerTest extends 
BaseIgniteAbstractTest {
             @Mock LogicalTopology logicalTopology,
             @Mock NodeAttributes nodeAttributes,
             @Mock FailureManager failureManager,
-            @Mock RaftGroupService raftGroupService,
-            @Mock MetricManager metricManager
+            @Mock PhysicalTopologyAwareRaftGroupService raftGroupService,
+            @Mock MetricManager metricManager,
+            @Mock RaftGroupEventsClientListener eventsClientListener
     ) throws NodeStoppingException {
         var addr = new NetworkAddress("localhost", 10_000);
 
         clusterService = ClusterServiceTestUtils.clusterService(testInfo, 
addr.port(), new StaticNodeFinder(List.of(addr)));
 
-        when(raftManager.startSystemRaftGroupNodeAndWaitNodeReady(any(), 
any(), any(), any(), any(), any()))
+        
when(raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(any(), 
any(), any(), any(), any(), any()))
                 .thenReturn(raftGroupService);
 
         ClusterState clusterState = cmgMessagesFactory.clusterState()
@@ -99,12 +101,11 @@ class ClusterManagementGroupManagerTest extends 
BaseIgniteAbstractTest {
                 .version("foo")
                 .build();
 
-        when(raftGroupService.run(any()))
-                .thenReturn(nullCompletedFuture());
+        // General catch-all for timeout version must come before specific 
matchers
         when(raftGroupService.run(any(), anyLong()))
                 .thenReturn(nullCompletedFuture());
-
-        when(raftGroupService.run(any(InitCmgStateCommand.class)))
+        // More specific matcher for InitCmgStateCommand - must come after 
general matcher
+        when(raftGroupService.run(any(InitCmgStateCommand.class), anyLong()))
                 .thenReturn(completedFuture(clusterState));
 
         cmgManager = new ClusterManagementGroupManager(
@@ -117,6 +118,7 @@ class ClusterManagementGroupManagerTest extends 
BaseIgniteAbstractTest {
                 logicalTopology,
                 nodeAttributes,
                 failureManager,
+                eventsClientListener,
                 new ClusterIdHolder(),
                 RaftGroupOptionsConfigurer.EMPTY,
                 metricManager
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
index 6ff0f01a128..33b9ced5421 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftServiceTest.java
@@ -34,8 +34,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
@@ -47,7 +46,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 @ExtendWith(MockitoExtension.class)
 class CmgRaftServiceTest extends BaseIgniteAbstractTest {
     @Mock
-    private RaftGroupService raftGroupService;
+    private TimeAwareRaftGroupService raftGroupService;
 
     @Mock
     private TopologyService topologyService;
@@ -68,7 +67,7 @@ class CmgRaftServiceTest extends BaseIgniteAbstractTest {
 
         assertThat(cmgRaftService.completeJoinCluster(new 
EmptyNodeAttributes()), willCompleteSuccessfully());
 
-        verify(raftGroupService).run(any(), eq(RaftCommandRunner.NO_TIMEOUT));
+        verify(raftGroupService).run(any(), 
eq(TimeAwareRaftGroupService.NO_TIMEOUT));
     }
 
     private static class EmptyNodeAttributes implements NodeAttributes {
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 79883be747f..65fefe75e53 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.ReverseIterator;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.TestInfo;
 
 /**
@@ -145,7 +146,15 @@ public class MockNode {
                 this.workDir.resolve("partitions/log")
         );
 
-        var raftManager = TestLozaFactory.create(clusterService, 
raftConfiguration, systemLocalConfiguration, new HybridClockImpl());
+        var eventsClientListener = new RaftGroupEventsClientListener();
+
+        var raftManager = TestLozaFactory.create(
+                clusterService,
+                raftConfiguration,
+                systemLocalConfiguration,
+                new HybridClockImpl(),
+                eventsClientListener
+        );
 
         var clusterStateStorage =
                 new 
RocksDbClusterStateStorage(this.workDir.resolve("cmg/data"), 
clusterService.nodeName());
@@ -179,6 +188,7 @@ public class MockNode {
                 new LogicalTopologyImpl(clusterStateStorage, failureManager),
                 collector,
                 failureManager,
+                eventsClientListener,
                 clusterIdHolder,
                 cmgRaftConfigurer,
                 new NoOpMetricManager(),
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index fc0e4728fa8..1804d6dc70d 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1317,6 +1317,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     logicalTopology,
                     new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                     failureManager,
+                    raftGroupEventsClientListener,
                     clusterIdService,
                     cmgRaftConfigurer,
                     metricManager
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 5fed98e9214..034c552ddd9 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -197,6 +197,7 @@ abstract class ItMetaStorageMultipleNodesAbstractTest 
extends IgniteAbstractTest
                     logicalTopology,
                     new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                     failureManager,
+                    raftGroupEventsClientListener,
                     new ClusterIdHolder(),
                     cmgRaftConfigurator,
                     new NoOpMetricManager()
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index a2868e7655a..2ec4b15594d 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -215,6 +215,7 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     logicalTopology,
                     new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                     failureManager,
+                    raftGroupEventsClientListener,
                     new ClusterIdHolder(),
                     cmgRaftConfigurer,
                     metricManager
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 65b9de944a1..55efc71c922 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -441,6 +441,7 @@ public class Node {
                 logicalTopology,
                 new NodeAttributesCollector(nodeAttributesConfiguration, 
storageConfiguration),
                 failureManager,
+                raftGroupEventsClientListener,
                 clusterIdHolder,
                 cmgRaftConfigurer,
                 metricManager
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index 595ffccb24b..16ddb37eec7 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.jetbrains.annotations.Nullable;
 
@@ -83,6 +84,31 @@ public interface RaftManager extends IgniteComponent {
             RaftGroupOptionsConfigurer groupOptionsConfigurer
     ) throws NodeStoppingException;
 
+    /**
+     * Starts a Raft group and a time-aware Raft service on the current node, 
using the given service factory.
+     *
+     * <p>Synchronously waits for the Raft log to be applied.
+     *
+     * <p>This method is similar to {@link 
#startSystemRaftGroupNodeAndWaitNodeReady} but creates a
+     * {@link TimeAwareRaftGroupService} instead of a {@link RaftGroupService}.
+     *
+     * @param nodeId Raft node ID.
+     * @param configuration Peers and Learners of the Raft group.
+     * @param lsnr Raft group listener.
+     * @param eventsLsnr Raft group events listener.
+     * @param factory Service factory for creating time-aware Raft group 
service.
+     * @param groupOptionsConfigurer Configures raft log and snapshot storages.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    TimeAwareRaftGroupService 
startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
+            RaftNodeId nodeId,
+            PeersAndLearners configuration,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener eventsLsnr,
+            TimeAwareRaftGroupServiceFactory factory,
+            RaftGroupOptionsConfigurer groupOptionsConfigurer
+    ) throws NodeStoppingException;
+
     /**
      * Stops a given local Raft node.
      *
@@ -141,6 +167,28 @@ public interface RaftManager extends IgniteComponent {
             boolean isSystemGroup
     ) throws NodeStoppingException;
 
+    /**
+     * Creates a time-aware Raft group service providing operations on a Raft 
group, using the given factory.
+     *
+     * <p>This method is similar to {@link 
#startRaftGroupService(ReplicationGroupId, PeersAndLearners, 
RaftServiceFactory, Marshaller,
+     * ExceptionFactory, boolean)} but creates a {@link 
TimeAwareRaftGroupService} instead of a {@link RaftGroupService}.
+     *
+     * @param groupId Raft group ID.
+     * @param configuration Peers and Learners of the Raft group.
+     * @param factory Factory that should be used to create raft service.
+     * @param stoppingExceptionFactory Exception factory used to create 
exceptions thrown to indicate that the object is being stopped.
+     * @param isSystemGroup Whether the group service is for a system group or 
not.
+     * @return Time-aware Raft group service.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    TimeAwareRaftGroupService startTimeAwareRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners configuration,
+            TimeAwareRaftGroupServiceFactory factory,
+            ExceptionFactory stoppingExceptionFactory,
+            boolean isSystemGroup
+    ) throws NodeStoppingException;
+
     /**
      * Destroys Raft group node storages (log storage, metadata storage and 
snapshots storage).
      *
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
new file mode 100644
index 00000000000..5aaf3273896
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/TimeAwareRaftGroupServiceFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Factory for creating {@link TimeAwareRaftGroupService} instances.
+ *
+ * <p>This factory is similar to {@link RaftServiceFactory} but creates 
services that implement
+ * {@link TimeAwareRaftGroupService} interface instead of requiring the 
service to extend
+ * {@link org.apache.ignite.internal.raft.service.RaftGroupService}.
+ */
+@FunctionalInterface
+public interface TimeAwareRaftGroupServiceFactory {
+    /**
+     * Creates a time-aware Raft group service.
+     *
+     * @param groupId Group id.
+     * @param peersAndLearners Peers configuration.
+     * @param raftConfiguration Raft configuration.
+     * @param raftClientExecutor Client executor.
+     * @param commandsMarshaller Marshaller that should be used to serialize 
commands.
+     * @param stoppingExceptionFactory Exception factory used to create 
exceptions thrown to indicate that the object is being stopped.
+     * @return New time-aware Raft client.
+     */
+    TimeAwareRaftGroupService startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners peersAndLearners,
+            RaftConfiguration raftConfiguration,
+            ScheduledExecutorService raftClientExecutor,
+            Marshaller commandsMarshaller,
+            ExceptionFactory stoppingExceptionFactory
+    );
+}
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
index 25c0e3f8063..b1059092bb4 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java
@@ -52,6 +52,8 @@ import org.jetbrains.annotations.Nullable;
  * <p>All async operations provided by the service are not cancellable.
  */
 public interface TimeAwareRaftGroupService {
+    /** Constant meaning that an operation will never timeout. */
+    long NO_TIMEOUT = Long.MAX_VALUE;
 
     /**
      * Runs a command on a replication group leader with the given timeout.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 4f3fcb90c1f..e779040a050 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.raft.server.impl.GroupStoragesContextResolver;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService;
 import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
 import 
org.apache.ignite.internal.raft.storage.impl.NoopGroupStoragesDestructionIntents;
 import org.apache.ignite.internal.raft.storage.impl.StorageDestructionIntent;
@@ -365,6 +366,50 @@ public class Loza implements RaftManager {
         }
     }
 
+    @Override
+    public TimeAwareRaftGroupService 
startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(
+            RaftNodeId nodeId,
+            PeersAndLearners configuration,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener eventsLsnr,
+            TimeAwareRaftGroupServiceFactory factory,
+            RaftGroupOptionsConfigurer groupOptionsConfigurer
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            RaftGroupOptions raftGroupOptions = RaftGroupOptions.defaults()
+                    .setSystemGroup(true);
+
+            groupOptionsConfigurer.configure(raftGroupOptions);
+
+            // Start the raft node first (server side).
+            startRaftGroupNodeInternal(
+                    nodeId,
+                    configuration,
+                    lsnr,
+                    eventsLsnr,
+                    raftGroupOptions,
+                    null,
+                    StoppingExceptionFactories.indicateNodeStop()
+            );
+
+            // Create the time-aware service using the provided factory.
+            return factory.startRaftGroupService(
+                    nodeId.groupId(),
+                    configuration,
+                    raftConfiguration,
+                    executor,
+                    opts.getCommandsMarshaller(),
+                    StoppingExceptionFactories.indicateNodeStop()
+            );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     @Override
     public RaftGroupService startRaftGroupService(ReplicationGroupId groupId, 
PeersAndLearners configuration, boolean isSystemGroup)
             throws NodeStoppingException {
@@ -422,6 +467,32 @@ public class Loza implements RaftManager {
         }
     }
 
+    @Override
+    public TimeAwareRaftGroupService startTimeAwareRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners configuration,
+            TimeAwareRaftGroupServiceFactory factory,
+            ExceptionFactory stoppingExceptionFactory,
+            boolean isSystemGroup
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return factory.startRaftGroupService(
+                    groupId,
+                    configuration,
+                    raftConfiguration,
+                    executor,
+                    opts.getCommandsMarshaller(),
+                    stoppingExceptionFactory
+            );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     @Override
     public void destroyRaftNodeStorages(RaftNodeId nodeId, 
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer)
             throws NodeStoppingException {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
index 7c7ec1a6434..6efa0bb7929 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
@@ -84,8 +84,8 @@ public class PhysicalTopologyAwareRaftGroupService implements 
TimeAwareRaftGroup
     /** General leader election listener. */
     private final ServerEventHandler generalLeaderElectionListener;
 
-    /** Failure manager. */
-    private final FailureManager failureManager;
+    /** Failure processor. */
+    private final FailureProcessor failureProcessor;
 
     /** Cluster service. */
     private final ClusterService clusterService;
@@ -117,7 +117,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
     /**
      * Constructor.
      *
-     * @param failureManager Failure manager.
+     * @param failureProcessor Failure processor.
      * @param clusterService Cluster service.
      * @param executor Executor to invoke RPC requests and notify listeners.
      * @param raftConfiguration RAFT configuration.
@@ -125,7 +125,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
      */
     private PhysicalTopologyAwareRaftGroupService(
             ReplicationGroupId groupId,
-            FailureManager failureManager,
+            FailureProcessor failureProcessor,
             ClusterService clusterService,
             ScheduledExecutorService executor,
             RaftConfiguration raftConfiguration,
@@ -135,7 +135,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
             RaftGroupEventsClientListener eventsClientListener
     ) {
         this.groupId = groupId;
-        this.failureManager = failureManager;
+        this.failureProcessor = failureProcessor;
         this.clusterService = clusterService;
         this.stoppingExceptionFactory = stoppingExceptionFactory;
         this.peers = List.copyOf(configuration.peers());
@@ -247,7 +247,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
 
             return subscriptionMessageSender.send(member, 
msg).whenComplete((isSent, err) -> {
                 if (err != null) {
-                    failureManager.process(new FailureContext(err, "Could not 
change subscription to leader updates [grp="
+                    failureProcessor.process(new FailureContext(err, "Could 
not change subscription to leader updates [grp="
                             + groupId() + "]."));
                 }
 
@@ -270,7 +270,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
      * @param eventsClientListener Listener for RAFT group events.
      * @param cmdMarshaller Marshaller for RAFT commands.
      * @param stoppingExceptionFactory Factory for creating stopping 
exceptions.
-     * @param failureManager Manager for handling failures.
+     * @param failureProcessor Processor for handling failures.
      * @return A new instance of PhysicalTopologyAwareRaftGroupService.
      */
     public static PhysicalTopologyAwareRaftGroupService start(
@@ -282,11 +282,11 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
             RaftGroupEventsClientListener eventsClientListener,
             Marshaller cmdMarshaller,
             ExceptionFactory stoppingExceptionFactory,
-            FailureManager failureManager
+            FailureProcessor failureProcessor
     ) {
         return new PhysicalTopologyAwareRaftGroupService(
                 groupId,
-                failureManager,
+                failureProcessor,
                 cluster,
                 executor,
                 raftConfiguration,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index e52804085a9..a3362dd088c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -220,6 +220,7 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
                     logicalTopology,
                     new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                     failureManager,
+                    raftGroupEventsClientListener,
                     new ClusterIdHolder(),
                     cmgRaftConfigurer,
                     metricManager
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 96f6143aea7..383739ea49c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -194,6 +194,7 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
                     logicalTopology,
                     new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                     failureManager,
+                    raftGroupEventsClientListener,
                     new ClusterIdHolder(),
                     cmgRaftConfigurer,
                     metricManager
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7eff453dd2a..6d136c98722 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -468,6 +468,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 logicalTopology,
                 new NodeAttributesCollector(nodeAttributes, 
storageConfiguration),
                 failureProcessor,
+                raftGroupEventsClientListener,
                 clusterIdService,
                 cmgRaftConfigurer,
                 metricManager
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 59d56e84289..a3ccf925804 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -769,6 +769,7 @@ public class IgniteImpl implements Ignite {
                 validationManager,
                 nodeAttributesCollector,
                 failureManager,
+                raftGroupEventsClientListener,
                 clusterIdService,
                 cmgRaftConfigurer,
                 metricManager

Reply via email to