This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a50e7f0081 IGNITE-23010 Remove getLeader flag from
RaftGroupServiceImpl#start (#4545)
a50e7f0081 is described below
commit a50e7f008138944ea8d8830c1a83c65cb12307b7
Author: Mikhail Efremov <[email protected]>
AuthorDate: Tue Oct 15 19:59:12 2024 +0600
IGNITE-23010 Remove getLeader flag from RaftGroupServiceImpl#start (#4545)
---
.../management/raft/ItCmgRaftServiceTest.java | 8 +-
.../management/ClusterManagementGroupManager.java | 31 +--
.../ClusterManagementGroupManagerTest.java | 4 +-
.../impl/ItIdempotentCommandCacheTest.java | 6 +-
.../metastorage/impl/ItMetaStorageServiceTest.java | 17 +-
.../server/raft/ItMetaStorageRaftGroupTest.java | 21 +-
.../service/ItAbstractListenerSnapshotTest.java | 10 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 59 +++---
.../MetaStorageDeployWatchesCorrectnessTest.java | 4 +-
.../impl/MetaStorageManagerRecoveryTest.java | 7 +-
.../impl/StandaloneMetaStorageManager.java | 8 +-
.../placementdriver/PlacementDriverManager.java | 6 +-
.../apache/ignite/internal/raft/RaftManager.java | 20 +-
.../ignite/internal/raft/RaftServiceFactory.java | 5 +-
.../ignite/internal/raft/ItLearnersTest.java | 135 ++++++-------
.../apache/ignite/internal/raft/ItLozaTest.java | 20 +-
.../internal/raft/ItRaftGroupServiceTest.java | 60 +++---
.../raft/ItTruncateSuffixAndRestartTest.java | 15 +-
.../raft/server/ItSimpleCounterServerTest.java | 6 +-
.../ignite/raft/server/JraftAbstractTest.java | 3 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 25 ++-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 28 +--
.../org/apache/ignite/internal/raft/LozaTest.java | 2 +-
.../ignite/internal/raft/RaftGroupServiceTest.java | 8 +-
.../raft/client/TopologyAwareRaftGroupService.java | 19 +-
.../TopologyAwareRaftGroupServiceFactory.java | 4 +-
.../ignite/internal/replicator/ReplicaManager.java | 214 ++++++---------------
.../internal/replicator/ReplicaManagerTest.java | 3 +-
.../AbstractTopologyAwareGroupServiceTest.java | 3 +-
.../ItMetastorageGroupDisasterRecoveryTest.java | 3 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 +-
.../ReplicasSafeTimePropagationTest.java | 59 +++---
.../distributed/TableManagerRecoveryTest.java | 3 +-
.../table/distributed/TableManagerTest.java | 4 +-
34 files changed, 320 insertions(+), 502 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 6f4d544269..98b0407ac9 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -142,14 +142,14 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
Peer serverPeer = configuration.peer(localMember().name());
- CompletableFuture<RaftGroupService> raftService;
+ RaftGroupService raftService;
if (serverPeer == null) {
raftService =
raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration);
} else {
var clusterStateStorageMgr = new
ClusterStateStorageManager(clusterStateStorage);
- raftService =
raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
+ raftService =
raftManager.startRaftGroupNodeAndWaitNodeReady(
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
configuration,
new CmgRaftGroupListener(
@@ -164,9 +164,7 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
);
}
- assertThat(raftService, willCompleteSuccessfully());
-
- this.raftService = new CmgRaftService(raftService.join(),
clusterService, logicalTopology);
+ this.raftService = new CmgRaftService(raftService,
clusterService, logicalTopology);
} catch (InterruptedException | NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index a067fce5e2..233b7da41a 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -773,21 +774,21 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
assert serverPeer != null;
try {
- return raftManager
- .startRaftGroupNodeAndWaitNodeReadyFuture(
- raftNodeId(serverPeer),
- raftConfiguration,
- new CmgRaftGroupListener(
- clusterStateStorageMgr,
- logicalTopology,
- validationManager,
- this::onLogicalTopologyChanged,
- clusterIdStore
- ),
- this::onElectedAsLeader,
- raftGroupOptionsConfigurer
- )
- .thenApply(service -> new CmgRaftService(service,
clusterService, logicalTopology));
+ RaftGroupService service =
raftManager.startRaftGroupNodeAndWaitNodeReady(
+ raftNodeId(serverPeer),
+ raftConfiguration,
+ new CmgRaftGroupListener(
+ clusterStateStorageMgr,
+ logicalTopology,
+ validationManager,
+ this::onLogicalTopologyChanged,
+ clusterIdStore
+ ),
+ this::onElectedAsLeader,
+ raftGroupOptionsConfigurer
+ );
+
+ return completedFuture(new CmgRaftService(service, clusterService,
logicalTopology));
} catch (Exception e) {
return failedFuture(e);
}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index c292164e20..97998d896c 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -86,8 +86,8 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
clusterService = ClusterServiceTestUtils.clusterService(testInfo,
addr.port(), new StaticNodeFinder(List.of(addr)));
- when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(),
any(), any(), any(), any()))
- .thenReturn(completedFuture(raftGroupService));
+ when(raftManager.startRaftGroupNodeAndWaitNodeReady(any(), any(),
any(), any(), any()))
+ .thenReturn(raftGroupService);
ClusterState clusterState = cmgMessagesFactory.clusterState()
.clusterTag(cmgMessagesFactory.clusterTag().clusterId(UUID.randomUUID()).clusterName("foo").build())
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 61c2ff973e..7eb0114065 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -525,12 +525,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
.fromConsistentIds(nodes.stream().map(n ->
n.clusterService.nodeName()).collect(toSet()));
try {
- CompletableFuture<RaftGroupService> raftServiceFuture =
node.raftManager
+ return node.raftManager
.startRaftGroupService(MetastorageGroupId.INSTANCE,
configuration);
-
- assertThat(raftServiceFuture, willCompleteSuccessfully());
-
- return raftServiceFuture.join();
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 8f3370172c..48b176035f 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -222,13 +222,16 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
}
void start(PeersAndLearners configuration) {
- CompletableFuture<RaftGroupService> raftService =
- startAsync(new ComponentContext(), clusterService,
partitionsLogStorageFactory, raftManager)
- .thenCompose(unused ->
startRaftService(configuration));
+ CompletableFuture<Void> startFuture = startAsync(
+ new ComponentContext(),
+ clusterService,
+ partitionsLogStorageFactory,
+ raftManager
+ );
- assertThat(raftService, willCompleteSuccessfully());
+ assertThat(startFuture, willCompleteSuccessfully());
- metaStorageRaftService = raftService.join();
+ metaStorageRaftService = startRaftService(configuration);
metaStorageService = new MetaStorageServiceImpl(
clusterService.nodeName(),
@@ -243,7 +246,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
return clusterService.nodeName();
}
- private CompletableFuture<RaftGroupService>
startRaftService(PeersAndLearners configuration) {
+ private RaftGroupService startRaftService(PeersAndLearners
configuration) {
String name = name();
boolean isLearner = configuration.peer(name) == null;
@@ -257,7 +260,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
try {
- return raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
+ return raftManager.startRaftGroupNodeAndWaitNodeReady(
raftNodeId,
configuration,
listener,
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 920ab7bdf1..2792d5b1e3 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -480,38 +480,35 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
groupOptions3
);
- metaStorageRaftGrpSvc1 =
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
+ metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
MetastorageGroupId.INSTANCE,
cluster.get(0),
FACTORY,
raftConfiguration,
membersConfiguration,
- true,
executor,
commandsMarshaller
- ));
+ );
- metaStorageRaftGrpSvc2 =
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
+ metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start(
MetastorageGroupId.INSTANCE,
cluster.get(1),
FACTORY,
raftConfiguration,
membersConfiguration,
- true,
executor,
commandsMarshaller
- ));
+ );
- metaStorageRaftGrpSvc3 =
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
+ metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start(
MetastorageGroupId.INSTANCE,
cluster.get(2),
FACTORY,
raftConfiguration,
membersConfiguration,
- true,
executor,
commandsMarshaller
- ));
+ );
assertTrue(waitForCondition(
() -> sameLeaders(metaStorageRaftGrpSvc1,
metaStorageRaftGrpSvc2, metaStorageRaftGrpSvc3), 10_000),
@@ -527,12 +524,6 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
return raftServersRaftGroups;
}
- private static RaftGroupService
waitForRaftGroupServiceSafely(CompletableFuture<RaftGroupService> future) {
- assertThat(future, willCompleteSuccessfully());
-
- return future.join();
- }
-
/**
* Checks if all raft groups have the same leader.
*
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 3e8bed7160..5ec5623e56 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -498,13 +498,11 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
Marshaller commandsMarshaller = commandsMarshaller(clientNode);
- CompletableFuture<RaftGroupService> clientFuture = RaftGroupServiceImpl
- .start(groupId, clientNode, FACTORY, raftConfiguration,
initialMemberConf, true, executor, commandsMarshaller);
+ RaftGroupService client = RaftGroupServiceImpl
+ .start(groupId, clientNode, FACTORY, raftConfiguration,
initialMemberConf, executor, commandsMarshaller);
- assertThat(clientFuture, willCompleteSuccessfully());
+ clients.add(client);
- clients.add(clientFuture.join());
-
- return clientFuture.join();
+ return client;
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 17e56e20fc..d738d555a1 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.Collections.emptySet;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
@@ -77,6 +78,7 @@ import
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -437,7 +439,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
MetaStorageListener raftListener = new MetaStorageListener(storage,
clusterTime, this::onConfigurationCommitted);
- CompletableFuture<TopologyAwareRaftGroupService> serviceFuture =
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+ TopologyAwareRaftGroupService service =
raftMgr.startRaftGroupNodeAndWaitNodeReady(
raftNodeId(localPeer),
configuration,
raftListener,
@@ -453,28 +455,30 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
);
- serviceFuture
- .thenAccept(service -> service.subscribeLeader(new
MetaStorageLeaderElectionListener(
- busyLock,
- clusterService,
- logicalTopologyService,
- metaStorageSvcFut,
- learnerManager,
- clusterTime,
- // We use the "deployWatchesFuture" to guarantee that
the Configuration Manager will be started
- // when the underlying code tries to read Meta Storage
configuration. This is a consequence of having a circular
- // dependency between these two components.
- deployWatchesFuture.thenApply(v ->
localMetaStorageConfiguration),
- electionListeners,
- this::peersChangeStateExists
- )))
- .whenComplete((v, e) -> {
+ LeaderElectionListener leaderElectionListener = new
MetaStorageLeaderElectionListener(
+ busyLock,
+ clusterService,
+ logicalTopologyService,
+ metaStorageSvcFut,
+ learnerManager,
+ clusterTime,
+ // We use the "deployWatchesFuture" to guarantee that the
Configuration Manager will be started
+ // when the underlying code tries to read Meta Storage
configuration. This is a consequence of having a circular
+ // dependency between these two components.
+ deployWatchesFuture.thenApply(v ->
localMetaStorageConfiguration),
+ electionListeners,
+ this::peersChangeStateExists
+ );
+
+ return completedFuture(service)
+ .thenAccept(s -> s.subscribeLeader(leaderElectionListener))
+ .handle((v, e) -> {
if (e != null) {
LOG.error("Unable to register
MetaStorageLeaderElectionListener", e);
}
- });
- return serviceFuture;
+ return service;
+ });
}
private boolean peersChangeStateExists() {
@@ -1018,20 +1022,21 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
PeersAndLearners raftClientConfiguration,
Function<RaftGroupService, CompletableFuture<T>> action
) {
- return startOneOffRaftGroupService(raftClientConfiguration)
- .thenCompose(raftGroupService -> action.apply(raftGroupService)
- .whenComplete((res, ex) -> raftGroupService.shutdown())
- );
- }
-
- private CompletableFuture<RaftGroupService>
startOneOffRaftGroupService(PeersAndLearners newConfiguration) {
try {
- return raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE,
newConfiguration);
+ RaftGroupService raftGroupService =
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE,
raftClientConfiguration);
+
+ return action.apply(raftGroupService)
+ .whenComplete((res, ex) -> raftGroupService.shutdown());
} catch (NodeStoppingException e) {
return failedFuture(e);
}
}
+ private RaftGroupService startOneOffRaftGroupService(PeersAndLearners
newConfiguration)
+ throws NodeStoppingException {
+ return raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE,
newConfiguration);
+ }
+
@TestOnly
public CompletableFuture<MetaStorageServiceImpl> metaStorageService() {
return metaStorageSvcFut;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index 0bc7101ed3..c51a3081f5 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -77,8 +77,8 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
new
CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(mcNodeName)).build()
));
when(clusterService.nodeName()).thenReturn(mcNodeName);
- when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(),
any(), any(), any(), any(), any(), any()))
- .thenReturn(completedFuture(raftGroupService));
+ when(raftManager.startRaftGroupNodeAndWaitNodeReady(any(), any(),
any(), any(), any(), any(), any()))
+ .thenReturn(raftGroupService);
when(raftGroupService.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation
-> completedFuture(0L));
return Stream.of(
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index 8925e954a8..d5b471da2d 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -100,13 +101,13 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
private RaftManager raftManager(long remoteRevision) throws Exception {
RaftManager raft = mock(RaftManager.class);
- RaftGroupService service = mock(RaftGroupService.class);
+ RaftGroupService service = mock(TopologyAwareRaftGroupService.class);
when(service.run(any(GetCurrentRevisionCommand.class)))
.thenAnswer(invocation -> completedFuture(remoteRevision));
- when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(),
any(), any(), any(), any(), any()))
- .thenAnswer(invocation -> completedFuture(service));
+ when(raft.startRaftGroupNodeAndWaitNodeReady(any(), any(), any(),
any(), any(), any(), any()))
+ .thenAnswer(invocation -> service);
return raft;
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 5538cf92a0..610463815e 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -176,16 +176,16 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
TopologyAwareRaftGroupService raftGroupService =
mock(TopologyAwareRaftGroupService.class);
try {
- when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
+ when(raftManager.startRaftGroupNodeAndWaitNodeReady(
any(),
any(),
listenerCaptor.capture(),
any(),
any(),
any()
- )).thenReturn(completedFuture(raftGroupService));
+ )).thenReturn(raftGroupService);
- when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
+ when(raftManager.startRaftGroupNodeAndWaitNodeReady(
any(),
any(),
listenerCaptor.capture(),
@@ -193,7 +193,7 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
any(),
any(),
any()
- )).thenReturn(completedFuture(raftGroupService));
+ )).thenReturn(raftGroupService);
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 4271066b66..ea3e7e4e88 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -167,12 +167,14 @@ public class PlacementDriverManager implements
IgniteComponent {
try {
leaseUpdater.init();
- return raftManager.startRaftGroupService(
+ TopologyAwareRaftGroupService raftClient =
raftManager.startRaftGroupService(
replicationGroupId,
PeersAndLearners.fromConsistentIds(placementDriverNodes),
topologyAwareRaftGroupServiceFactory,
null // Use default commands marshaller.
- ).thenCompose(client ->
client.subscribeLeader(this::onLeaderChange).thenApply(v -> client));
+ );
+
+ return
raftClient.subscribeLeader(this::onLeaderChange).thenApply(v -> raftClient);
} catch (NodeStoppingException e) {
return failedFuture(e);
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index f2037795a4..8709ce5269 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -50,7 +49,7 @@ public interface RaftManager extends IgniteComponent {
* @param factory Service factory.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ <T extends RaftGroupService> T startRaftGroupNode(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -72,7 +71,7 @@ public interface RaftManager extends IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
// FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
- CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ RaftGroupService startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -94,7 +93,7 @@ public interface RaftManager extends IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
// FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
- CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ RaftGroupService startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -117,7 +116,7 @@ public interface RaftManager extends IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
// FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
- <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ <T extends RaftGroupService> T startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -153,10 +152,10 @@ public interface RaftManager extends IgniteComponent {
*
* @param groupId Raft group ID.
* @param configuration Peers and Learners of the Raft group.
- * @return Future that will be completed with an instance of a Raft group
service.
+ * @return An instance of a Raft group service.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- CompletableFuture<RaftGroupService>
startRaftGroupService(ReplicationGroupId groupId, PeersAndLearners
configuration)
+ RaftGroupService startRaftGroupService(ReplicationGroupId groupId,
PeersAndLearners configuration)
throws NodeStoppingException;
/**
@@ -165,11 +164,12 @@ public interface RaftManager extends IgniteComponent {
* @param groupId Raft group ID.
* @param configuration Peers and Learners of the Raft group.
* @param factory Factory that should be used to create raft service.
- * @param commandsMarshaller Marshaller that should be used to serialize
commands. {@code null} if default marshaller should be used.
- * @return Future that will be completed with an instance of a Raft group
service.
+ * @param commandsMarshaller Marshaller that should be used to serialize
commands. {@code null} if default marshaller should be
+ * used.
+ * @return Raft group service.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
+ <T extends RaftGroupService> T startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners configuration,
RaftServiceFactory<T> factory,
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
index c800a07cb9..abaa546e8f 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -35,9 +34,9 @@ public interface RaftServiceFactory<T extends
RaftGroupService> {
* @param raftConfiguration Raft configuration.
* @param raftClientExecutor Client executor.
* @param commandsMarshaller Marshaller that should be used to serialize
commands.
- * @return Future that contains client when completes.
+ * @return New Raft client.
*/
- CompletableFuture<T> startRaftGroupService(
+ T startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners peersAndLearners,
RaftConfiguration raftConfiguration,
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index 7db4ad2d05..e9e8fa49ec 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -21,7 +21,6 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
@@ -189,26 +188,28 @@ public class ItLearnersTest extends IgniteAbstractTest {
List<Peer> serverPeers = nodesToPeers(configuration,
List.of(follower), learners);
- List<CompletableFuture<RaftGroupService>> services =
IntStream.range(0, nodes.size())
+ List<RaftGroupService> services = IntStream.range(0, nodes.size())
.mapToObj(i -> startRaftGroup(nodes.get(i),
serverPeers.get(i), configuration, listeners.get(i)))
.collect(toList());
// Check that learners and peers have been set correctly.
services.forEach(service -> {
- CompletableFuture<RaftGroupService> refreshMembers = service
- .thenCompose(s -> s.refreshMembers(true).thenApply(v ->
s));
+ CompletableFuture<Void> refreshMembers =
service.refreshMembers(true);
- assertThat(refreshMembers.thenApply(RaftGroupService::leader),
willBe(follower.asPeer()));
- assertThat(refreshMembers.thenApply(RaftGroupService::peers),
will(contains(follower.asPeer())));
- assertThat(refreshMembers.thenApply(RaftGroupService::learners),
will(containsInAnyOrder(toPeerArray(learners))));
+ assertThat(refreshMembers, willCompleteSuccessfully());
+
+ assertThat(service.leader(), is(follower.asPeer()));
+ assertThat(service.peers(), contains(follower.asPeer()));
+ assertThat(service.learners(),
containsInAnyOrder(toPeerArray(learners)));
});
listeners.forEach(listener -> assertThat(listener.storage,
is(empty())));
// Test writing data.
- CompletableFuture<?> writeFuture = services.get(0)
- .thenCompose(s -> s.run(createWriteCommand("foo")).thenApply(v
-> s))
- .thenCompose(s -> s.run(createWriteCommand("bar")));
+ RaftGroupService service = services.get(0);
+
+ CompletableFuture<?> writeFuture =
service.run(createWriteCommand("foo"))
+ .thenRun(() -> service.run(createWriteCommand("bar")));
assertThat(writeFuture, willCompleteSuccessfully());
@@ -228,22 +229,19 @@ public class ItLearnersTest extends IgniteAbstractTest {
PeersAndLearners configuration =
createConfiguration(List.of(follower), List.of());
- CompletableFuture<RaftGroupService> service1 = startRaftGroup(
+ RaftGroupService service1 = startRaftGroup(
follower,
configuration.peer(follower.consistentId()),
configuration,
new TestRaftGroupListener()
);
- assertThat(
- service1.thenCompose(service -> service.refreshLeader()
- .thenApply(v -> service.leader())),
- willBe(follower.asPeer())
- );
- assertThat(service1.thenApply(RaftGroupService::learners),
willBe(empty()));
+ assertThat(service1.refreshLeader(), willCompleteSuccessfully());
+
+ assertThat(service1.leader(), is(follower.asPeer()));
+ assertThat(service1.learners(), is(empty()));
- CompletableFuture<Void> addLearners = service1
- .thenCompose(s ->
s.addLearners(Arrays.asList(toPeerArray(learners))));
+ CompletableFuture<Void> addLearners =
service1.addLearners(Arrays.asList(toPeerArray(learners)));
assertThat(addLearners, willCompleteSuccessfully());
@@ -251,17 +249,18 @@ public class ItLearnersTest extends IgniteAbstractTest {
RaftNode learner1 = nodes.get(1);
- CompletableFuture<RaftGroupService> service2 =
+ RaftGroupService service2 =
startRaftGroup(learner1,
newConfiguration.learner(learner1.consistentId()), newConfiguration, new
TestRaftGroupListener());
// Check that learners and peers have been set correctly.
Stream.of(service1, service2).forEach(service -> {
- CompletableFuture<RaftGroupService> refreshMembers = service
- .thenCompose(s -> s.refreshMembers(true).thenApply(v ->
s));
+ CompletableFuture<Void> refreshMembers =
service.refreshMembers(true);
- assertThat(refreshMembers.thenApply(RaftGroupService::leader),
willBe(follower.asPeer()));
- assertThat(refreshMembers.thenApply(RaftGroupService::peers),
will(contains(follower.asPeer())));
- assertThat(refreshMembers.thenApply(RaftGroupService::learners),
will(containsInAnyOrder(toPeerArray(learners))));
+ assertThat(refreshMembers, willCompleteSuccessfully());
+
+ assertThat(service.leader(), is(follower.asPeer()));
+ assertThat(service.peers(), contains(follower.asPeer()));
+ assertThat(service.learners(),
containsInAnyOrder(toPeerArray(learners)));
});
}
@@ -277,19 +276,19 @@ public class ItLearnersTest extends IgniteAbstractTest {
List<Peer> serverPeers = nodesToPeers(configuration,
List.of(follower), learners);
- List<CompletableFuture<RaftGroupService>> services =
IntStream.range(0, nodes.size())
+ List<RaftGroupService> services = IntStream.range(0, nodes.size())
.mapToObj(i -> startRaftGroup(nodes.get(i),
serverPeers.get(i), configuration, new TestRaftGroupListener()))
.collect(toList());
// Wait for the leader to be elected.
services.forEach(service -> assertThat(
- service.thenCompose(s -> s.refreshLeader().thenApply(v ->
s.leader())),
- willBe(follower.asPeer()))
- );
+ service.refreshLeader().thenApply(v -> service.leader()),
+ willBe(follower.asPeer())
+ ));
nodes.set(0, null).close();
- assertThat(services.get(1).thenCompose(s ->
s.run(createWriteCommand("foo"))), willThrow(TimeoutException.class));
+ assertThat(services.get(1).run(createWriteCommand("foo")),
willThrow(TimeoutException.class));
}
/**
@@ -304,20 +303,20 @@ public class ItLearnersTest extends IgniteAbstractTest {
List<Peer> serverPeers = nodesToPeers(configuration,
List.of(follower), learners);
- List<CompletableFuture<RaftGroupService>> services =
IntStream.range(0, nodes.size())
+ List<RaftGroupService> services = IntStream.range(0, nodes.size())
.mapToObj(i -> startRaftGroup(nodes.get(i),
serverPeers.get(i), configuration, new TestRaftGroupListener()))
.collect(toList());
// Wait for the leader to be elected.
services.forEach(service -> assertThat(
- service.thenCompose(s -> s.refreshLeader().thenApply(v ->
s.leader())),
- willBe(follower.asPeer()))
- );
+ service.refreshLeader().thenApply(v -> service.leader()),
+ willBe(follower.asPeer())
+ ));
nodes.set(1, null).close();
nodes.set(2, null).close();
-
assertThat(services.get(0).thenCompose(RaftGroupService::refreshLeader),
willCompleteSuccessfully());
+ assertThat(services.get(0).refreshLeader(),
willCompleteSuccessfully());
}
/**
@@ -335,35 +334,20 @@ public class ItLearnersTest extends IgniteAbstractTest {
Peer peer = configuration.peer(node.consistentId());
Peer learner = configuration.learner(node.consistentId());
- CompletableFuture<RaftGroupService> peerService = startRaftGroup(node,
peer, configuration, peerListener);
- CompletableFuture<RaftGroupService> learnerService =
startRaftGroup(node, learner, configuration, learnerListener);
+ RaftGroupService peerService = startRaftGroup(node, peer,
configuration, peerListener);
+ RaftGroupService learnerService = startRaftGroup(node, learner,
configuration, learnerListener);
- assertThat(peerService.thenCompose(
- service -> service.refreshLeader()
- .thenApply(v -> service.leader())),
- willBe(peer)
- );
- assertThat(
- // the leader is already refreshed
- peerService.thenApply(RaftGroupService::leader),
- willBe(not(learner))
- );
+ assertThat(peerService.refreshLeader(), willCompleteSuccessfully());
+ assertThat(peerService.leader(), is(peer));
+ assertThat(peerService.leader(), is(not(learner)));
- assertThat(learnerService.thenCompose(
- service -> service.refreshLeader()
- .thenApply(v -> service.leader())),
- willBe(peer)
- );
- assertThat(
- // the leader is already refreshed
- learnerService.thenApply(RaftGroupService::leader),
- willBe(not(learner))
- );
+ assertThat(learnerService.refreshLeader(), willCompleteSuccessfully());
+ assertThat(learnerService.leader(), is(peer));
+ assertThat(learnerService.leader(), is(not(learner)));
// Test writing data.
- CompletableFuture<?> writeFuture = peerService
- .thenCompose(s -> s.run(createWriteCommand("foo")).thenApply(v
-> s))
- .thenCompose(s -> s.run(createWriteCommand("bar")));
+ CompletableFuture<?> writeFuture =
peerService.run(createWriteCommand("foo"))
+ .thenRun(() -> peerService.run(createWriteCommand("bar")));
assertThat(writeFuture, willCompleteSuccessfully());
@@ -384,21 +368,19 @@ public class ItLearnersTest extends IgniteAbstractTest {
PeersAndLearners configuration = createConfiguration(followers,
List.of(learner));
- CompletableFuture<?>[] followerServices = followers.stream()
- .map(node -> startRaftGroup(node,
configuration.peer(node.consistentId()), configuration, new
TestRaftGroupListener()))
- .toArray(CompletableFuture[]::new);
+ followers.forEach(
+ node -> startRaftGroup(node,
configuration.peer(node.consistentId()), configuration, new
TestRaftGroupListener())
+ );
- assertThat(CompletableFuture.allOf(followerServices),
willCompleteSuccessfully());
var learnerListener = new TestRaftGroupListener();
- CompletableFuture<RaftGroupService> learnerService = startRaftGroup(
+ RaftGroupService learnerService = startRaftGroup(
learner, configuration.learner(learner.consistentId()),
configuration, learnerListener
);
- CompletableFuture<?> writeFuture = learnerService
- .thenCompose(s -> s.run(createWriteCommand("foo")).thenApply(v
-> s))
- .thenCompose(s -> s.run(createWriteCommand("bar")));
+ CompletableFuture<?> writeFuture =
learnerService.run(createWriteCommand("foo"))
+ .thenRun(() -> learnerService.run(createWriteCommand("bar")));
assertThat(writeFuture, willCompleteSuccessfully());
assertThat(learnerListener.storage.poll(1, TimeUnit.SECONDS),
is("foo"));
@@ -409,19 +391,20 @@ public class ItLearnersTest extends IgniteAbstractTest {
PeersAndLearners newConfiguration = createConfiguration(followers,
List.of(learner, newLearner));
- CompletableFuture<Void> changePeersFuture =
learnerService.thenCompose(s -> s.refreshAndGetLeaderWithTerm()
- .thenCompose(leaderWithTerm ->
s.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term())
- ));
+ CompletableFuture<Void> changePeersFuture =
learnerService.refreshAndGetLeaderWithTerm()
+ .thenCompose(leaderWithTerm ->
learnerService.changePeersAndLearnersAsync(newConfiguration,
leaderWithTerm.term()));
assertThat(changePeersFuture, willCompleteSuccessfully());
var newLearnerListener = new TestRaftGroupListener();
- CompletableFuture<RaftGroupService> newLearnerService = startRaftGroup(
- newLearner,
newConfiguration.learner(newLearner.consistentId()), newConfiguration,
newLearnerListener
+ startRaftGroup(
+ newLearner,
+ newConfiguration.learner(newLearner.consistentId()),
+ newConfiguration,
+ newLearnerListener
);
- assertThat(newLearnerService, willCompleteSuccessfully());
assertThat(newLearnerListener.storage.poll(10, TimeUnit.SECONDS),
is("foo"));
assertThat(newLearnerListener.storage.poll(10, TimeUnit.SECONDS),
is("bar"));
}
@@ -440,14 +423,14 @@ public class ItLearnersTest extends IgniteAbstractTest {
).collect(toList());
}
- private CompletableFuture<RaftGroupService> startRaftGroup(
+ private RaftGroupService startRaftGroup(
RaftNode node,
Peer serverPeer,
PeersAndLearners memberConfiguration,
RaftGroupListener listener
) {
try {
- return node.loza.startRaftGroupNodeAndWaitNodeReadyFuture(
+ return node.loza.startRaftGroupNodeAndWaitNodeReady(
new RaftNodeId(RAFT_GROUP_ID, serverPeer),
memberConfiguration,
listener,
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 76d8e82bac..b248cd9259 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft;
-import static java.util.concurrent.CompletableFuture.allOf;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -45,7 +44,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
@@ -142,14 +140,13 @@ public class ItLozaTest extends IgniteAbstractTest {
var nodeId = new RaftNodeId(groupId, configuration.peer(node.name()));
- return loza.startRaftGroupNodeAndWaitNodeReadyFuture(
+ return loza.startRaftGroupNodeAndWaitNodeReady(
nodeId,
configuration,
raftGroupListener,
RaftGroupEventsListener.noopLsnr,
groupOptionsConfigurer
- )
- .get(10, TimeUnit.SECONDS);
+ );
}
/**
@@ -254,7 +251,7 @@ public class ItLozaTest extends IgniteAbstractTest {
// Start two Raft nodes: one backed by a volatile storage and the
other - by "shared" persistent storage.
var volatileRaftNodeId = new RaftNodeId(new
TestReplicationGroupId("volatile"), peer);
- CompletableFuture<RaftGroupService> volatileServiceFuture =
loza.startRaftGroupNode(
+ RaftGroupService volatileService = loza.startRaftGroupNode(
volatileRaftNodeId,
configuration,
raftGroupListener,
@@ -267,7 +264,7 @@ public class ItLozaTest extends IgniteAbstractTest {
var persistentNodeId = new RaftNodeId(new
TestReplicationGroupId("persistent"), peer);
- CompletableFuture<RaftGroupService> persistentServiceFuture =
loza.startRaftGroupNode(
+ RaftGroupService persistentService = loza.startRaftGroupNode(
persistentNodeId,
configuration,
raftGroupListener,
@@ -277,11 +274,6 @@ public class ItLozaTest extends IgniteAbstractTest {
.serverDataPath(partitionsWorkDir.metaPath())
);
- assertThat(allOf(volatileServiceFuture, persistentServiceFuture),
willCompleteSuccessfully());
-
- RaftGroupService volatileService = volatileServiceFuture.join();
- RaftGroupService persistentService = persistentServiceFuture.join();
-
// Execute two write command in parallel. We then hope that these
commands wil be batched together.
WriteCommand cmd = testWriteCommand("foo");
@@ -339,7 +331,7 @@ public class ItLozaTest extends IgniteAbstractTest {
partitionsWorkDir.metaPath()
);
- CompletableFuture<RaftGroupService> startServiceFuture =
loza.startRaftGroupNode(
+ RaftGroupService service = loza.startRaftGroupNode(
nodeId,
configuration,
raftGroupListener,
@@ -347,8 +339,6 @@ public class ItLozaTest extends IgniteAbstractTest {
null,
configurer
);
- assertThat(startServiceFuture, willCompleteSuccessfully());
- RaftGroupService service = startServiceFuture.join();
assertThat(service.run(testWriteCommand("foo")),
willCompleteSuccessfully());
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 124dbe4bec..e17571e88a 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -20,15 +20,15 @@ package org.apache.ignite.internal.raft;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -101,11 +101,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
.map(TestNode::name)
.collect(collectingAndThen(toSet(),
PeersAndLearners::fromConsistentIds));
- CompletableFuture<?>[] svcFutures = nodes.stream()
- .map(node -> node.startRaftGroup(configuration))
- .toArray(CompletableFuture[]::new);
-
- assertThat(CompletableFuture.allOf(svcFutures),
willCompleteSuccessfully());
+ nodes.forEach(node -> node.startRaftGroup(configuration));
}
private TestNode startNode(TestInfo testInfo) {
@@ -127,11 +123,9 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
@Test
@Timeout(20)
public void testTransferLeadership() {
- assertThat(nodes.get(0).raftGroupService, willCompleteSuccessfully());
+ assertThat(nodes.get(0).raftGroupService.refreshLeader(),
willCompleteSuccessfully());
- Peer leader = nodes.get(0).raftGroupService
- .thenCompose(service -> service.refreshLeader().thenApply(v ->
service.leader()))
- .join();
+ Peer leader = nodes.get(0).raftGroupService.leader();
TestNode oldLeaderNode = nodes.stream()
.filter(node -> node.name().equals(leader.consistentId()))
@@ -146,16 +140,16 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
Peer expectedNewLeaderPeer = new Peer(newLeaderNode.name());
CompletableFuture<Void> transferLeadership =
oldLeaderNode.raftGroupService
- .thenCompose(service ->
service.transferLeadership(expectedNewLeaderPeer));
+ .transferLeadership(expectedNewLeaderPeer);
assertThat(transferLeadership, willCompleteSuccessfully());
-
assertThat(oldLeaderNode.raftGroupService.thenApply(RaftGroupService::leader),
willBe(expectedNewLeaderPeer));
+ assertThat(oldLeaderNode.raftGroupService.leader(),
is(expectedNewLeaderPeer));
assertTrue(waitForCondition(() -> {
-
assertThat(newLeaderNode.raftGroupService.thenCompose(RaftGroupService::refreshLeader),
willCompleteSuccessfully());
+ assertThat(newLeaderNode.raftGroupService.refreshLeader(),
willCompleteSuccessfully());
- return
expectedNewLeaderPeer.equals(newLeaderNode.raftGroupService.join().leader());
+ return
expectedNewLeaderPeer.equals(newLeaderNode.raftGroupService.leader());
}, 10_000));
}
@@ -174,11 +168,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(newFollowersConfig, newLearnersConfig);
// Start Raft groups on the new nodes with the new configuration.
- CompletableFuture<?>[] startedServices =
Stream.concat(newFollowers.stream(), newLearners.stream())
- .map(node -> node.startRaftGroup(configuration))
- .toArray(CompletableFuture[]::new);
-
- assertThat(CompletableFuture.allOf(startedServices),
willCompleteSuccessfully());
+ Stream.concat(newFollowers.stream(),
newLearners.stream()).forEach(node -> node.startRaftGroup(configuration));
// Change Raft configuration and wait until it's applied.
var configurationComplete = new CountDownLatch(1);
@@ -189,10 +179,10 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
return null;
}).when(eventsListener).onNewPeersConfigurationApplied(any());
- CompletableFuture<Void> changePeersFuture =
nodes.get(0).raftGroupService
- .thenCompose(service -> service.refreshAndGetLeaderWithTerm()
- .thenCompose(l ->
service.changePeersAndLearnersAsync(configuration, l.term()))
- );
+ RaftGroupService service0 = nodes.get(0).raftGroupService;
+
+ CompletableFuture<Void> changePeersFuture =
service0.refreshAndGetLeaderWithTerm()
+ .thenCompose(l ->
service0.changePeersAndLearnersAsync(configuration, l.term()));
assertThat(changePeersFuture, willCompleteSuccessfully());
@@ -201,16 +191,16 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
// Check that configuration is the same on all nodes.
for (TestNode node : nodes) {
assertThat(
- node.raftGroupService.thenCompose(service ->
service.refreshMembers(true)),
+ node.raftGroupService.refreshMembers(true),
willCompleteSuccessfully()
);
assertThat(
- node.raftGroupService.thenApply(RaftGroupService::peers),
- will(containsInAnyOrder(configuration.peers().toArray()))
+ node.raftGroupService.peers(),
+ containsInAnyOrder(configuration.peers().toArray())
);
assertThat(
-
node.raftGroupService.thenApply(RaftGroupService::learners),
-
will(containsInAnyOrder(configuration.learners().toArray()))
+ node.raftGroupService.learners(),
+ containsInAnyOrder(configuration.learners().toArray())
);
}
}
@@ -218,7 +208,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
private class TestNode {
private final ClusterService clusterService;
private final Loza loza;
- private CompletableFuture<RaftGroupService> raftGroupService;
+ private RaftGroupService raftGroupService;
private final LogStorageFactory partitionsLogStorageFactory;
private final ComponentWorkingDir partitionsWorkDir;
@@ -246,7 +236,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
assertThat(startAsync(new ComponentContext(), clusterService,
partitionsLogStorageFactory, loza), willCompleteSuccessfully());
}
- CompletableFuture<RaftGroupService> startRaftGroup(PeersAndLearners
configuration) {
+ void startRaftGroup(PeersAndLearners configuration) {
String nodeName =
clusterService.topologyService().localMember().name();
Peer serverPeer = configuration.peer(nodeName);
@@ -254,7 +244,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
var nodeId = new RaftNodeId(RAFT_GROUP_NAME, serverPeer == null ?
configuration.learner(nodeName) : serverPeer);
try {
- raftGroupService =
loza.startRaftGroupNodeAndWaitNodeReadyFuture(
+ raftGroupService = loza.startRaftGroupNodeAndWaitNodeReady(
nodeId,
configuration,
mock(RaftGroupListener.class),
@@ -262,17 +252,15 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory,
partitionsWorkDir.metaPath())
);
} catch (NodeStoppingException e) {
- return CompletableFuture.failedFuture(e);
+ fail(e);
}
-
- return raftGroupService;
}
void beforeNodeStop() throws Exception {
Stream<AutoCloseable> shutdownService = Stream.of(
raftGroupService == null
? null
- : (AutoCloseable) () -> raftGroupService.get(1,
TimeUnit.SECONDS).shutdown()
+ : (AutoCloseable) () -> raftGroupService.shutdown()
);
Stream<AutoCloseable> stopRaftGroups =
loza.localNodes().stream().map(id -> () -> loza.stopRaftNode(id));
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index d6170b0032..f840e682c5 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -163,7 +163,7 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
final Loza raftMgr;
- private @Nullable CompletableFuture<RaftGroupService> serviceFuture;
+ private @Nullable RaftGroupService raftGroupService;
private TestRaftGroupListener raftGroupListener;
@@ -207,7 +207,7 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
raftGroupListener = new TestRaftGroupListener();
try {
- serviceFuture = raftMgr.startRaftGroupNode(
+ raftGroupService = raftMgr.startRaftGroupNode(
new RaftNodeId(GROUP_ID, new Peer(nodeName)),
raftGroupConfiguration,
raftGroupListener,
@@ -222,15 +222,16 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
}
RaftGroupService getService() {
- assertNotNull(serviceFuture);
- assertThat(serviceFuture, willCompleteSuccessfully());
+ assertNotNull(raftGroupService);
- return serviceFuture.join();
+ return raftGroupService;
}
void stopService() {
- if (serviceFuture != null) {
- serviceFuture = null;
+ if (raftGroupService != null) {
+ raftGroupService.shutdown();
+
+ raftGroupService = null;
try {
raftMgr.stopRaftNode(new RaftNodeId(GROUP_ID, new
Peer(nodeName)));
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index a4ef357037..bb5828e3a1 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -146,14 +146,12 @@ class ItSimpleCounterServerTest extends
RaftServerAbstractTest {
executor = new ScheduledThreadPoolExecutor(20, new
NamedThreadFactory(Loza.CLIENT_POOL_NAME, logger()));
client1 = RaftGroupServiceImpl
- .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY,
raftConfiguration, memberConfiguration, false, executor, cmdMarshaller)
- .get(3, TimeUnit.SECONDS);
+ .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY,
raftConfiguration, memberConfiguration, executor, cmdMarshaller);
ClusterService clientNode2 = clusterService(PORT + 2, List.of(addr),
true);
client2 = RaftGroupServiceImpl
- .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY,
raftConfiguration, memberConfiguration, false, executor, cmdMarshaller)
- .get(3, TimeUnit.SECONDS);
+ .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY,
raftConfiguration, memberConfiguration, executor, cmdMarshaller);
assertTrue(waitForTopology(service, 3, 10_000));
assertTrue(waitForTopology(clientNode1, 3, 10_000));
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
index 143010a075..a1aa7edb73 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -235,8 +235,7 @@ public abstract class JraftAbstractTest extends
RaftServerAbstractTest {
var commandsMarshaller = new
ThreadLocalOptimizedMarshaller(clientNode.serializationRegistry());
RaftGroupService client = RaftGroupServiceImpl
- .start(groupId, clientNode, FACTORY, raftConfiguration,
configuration, false, executor, commandsMarshaller)
- .get(3, TimeUnit.SECONDS);
+ .start(groupId, clientNode, FACTORY, raftConfiguration,
configuration, executor, commandsMarshaller);
clients.add(client);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index f0e01160fa..9dda4e6054 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -194,7 +194,7 @@ public class Loza implements RaftManager {
}
@Override
- public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNode(
+ public <T extends RaftGroupService> T startRaftGroupNode(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -219,7 +219,7 @@ public class Loza implements RaftManager {
* @param groupOptions Options to apply to the group.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- public CompletableFuture<RaftGroupService> startRaftGroupNode(
+ public RaftGroupService startRaftGroupNode(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -240,7 +240,7 @@ public class Loza implements RaftManager {
* @param raftServiceFactory If not null, used for creation of raft group
service.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNode(
+ public <T extends RaftGroupService> T startRaftGroupNode(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -287,7 +287,7 @@ public class Loza implements RaftManager {
}
@Override
- public CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ public RaftGroupService startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -302,7 +302,7 @@ public class Loza implements RaftManager {
}
@Override
- public CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ public RaftGroupService startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -310,7 +310,7 @@ public class Loza implements RaftManager {
RaftNodeDisruptorConfiguration disruptorConfiguration,
RaftGroupOptionsConfigurer groupOptionsConfigurer
) throws NodeStoppingException {
- return startRaftGroupNodeAndWaitNodeReadyFuture(
+ return startRaftGroupNodeAndWaitNodeReady(
nodeId,
configuration,
lsnr,
@@ -322,7 +322,7 @@ public class Loza implements RaftManager {
}
@Override
- public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ public <T extends RaftGroupService> T startRaftGroupNodeAndWaitNodeReady(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -358,7 +358,7 @@ public class Loza implements RaftManager {
}
@Override
- public CompletableFuture<RaftGroupService>
startRaftGroupService(ReplicationGroupId groupId, PeersAndLearners
configuration)
+ public RaftGroupService startRaftGroupService(ReplicationGroupId groupId,
PeersAndLearners configuration)
throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
@@ -373,7 +373,7 @@ public class Loza implements RaftManager {
}
@Override
- public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupService(
+ public <T extends RaftGroupService> T startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners configuration,
RaftServiceFactory<T> factory,
@@ -424,7 +424,7 @@ public class Loza implements RaftManager {
}
}
- private <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeInternal(
+ private <T extends RaftGroupService> T startRaftGroupNodeInternal(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -437,7 +437,7 @@ public class Loza implements RaftManager {
Marshaller cmdMarshaller =
requireNonNullElse(groupOptions.commandsMarshaller(),
opts.getCommandsMarshaller());
return raftServiceFactory == null
- ? (CompletableFuture<T>)
startRaftGroupServiceInternal(nodeId.groupId(), configuration, cmdMarshaller)
+ ? (T) startRaftGroupServiceInternal(nodeId.groupId(),
configuration, cmdMarshaller)
: raftServiceFactory.startRaftGroupService(nodeId.groupId(),
configuration, raftConfiguration, executor, cmdMarshaller);
}
@@ -462,7 +462,7 @@ public class Loza implements RaftManager {
}
}
- private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
+ private RaftGroupService startRaftGroupServiceInternal(
ReplicationGroupId grpId,
PeersAndLearners membersConfiguration,
Marshaller commandsMarshaller
@@ -473,7 +473,6 @@ public class Loza implements RaftManager {
FACTORY,
raftConfiguration,
membersConfiguration,
- true,
executor,
commandsMarshaller
);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index e6f6a8eec7..04fc475234 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -161,17 +161,15 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
* @param factory Message factory.
* @param configuration Raft configuration.
* @param membersConfiguration Raft members configuration.
- * @param getLeader {@code True} to get the group's leader upon service
creation.
* @param executor Executor for retrying requests.
- * @return Future representing pending completion of the operation.
+ * @return A new Raft group service.
*/
- public static CompletableFuture<RaftGroupService> start(
+ public static RaftGroupService start(
ReplicationGroupId groupId,
ClusterService cluster,
RaftMessagesFactory factory,
RaftConfiguration configuration,
PeersAndLearners membersConfiguration,
- boolean getLeader,
ScheduledExecutorService executor,
Marshaller commandsMarshaller
) {
@@ -207,27 +205,7 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
);
}
- getLeader = false;
-
- if (!getLeader) {
- return completedFuture(service);
- }
-
- return service.refreshLeader().handle((unused, throwable) -> {
- if (throwable != null) {
- if (throwable.getCause() instanceof TimeoutException) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to refresh a leader [groupId={}]",
groupId);
- }
- } else {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Failed to refresh a leader [groupId={}]",
throwable, groupId);
- }
- }
- }
-
- return service;
- });
+ return service;
}
@Override
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 6058bc528f..8e8652bb33 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -89,7 +89,7 @@ public class LozaTest extends IgniteAbstractTest {
assertThrows(
NodeStoppingException.class,
- () -> loza.startRaftGroupNodeAndWaitNodeReadyFuture(
+ () -> loza.startRaftGroupNodeAndWaitNodeReady(
new RaftNodeId(raftGroupId, serverPeer),
configuration,
null,
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index 38ee0fda36..863c74b4e9 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -628,13 +628,9 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
var commandsSerializer = new
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
- CompletableFuture<RaftGroupService> service =
RaftGroupServiceImpl.start(
- TEST_GRP, cluster, FACTORY, raftConfiguration,
memberConfiguration, false, executor, commandsSerializer
+ return RaftGroupServiceImpl.start(
+ TEST_GRP, cluster, FACTORY, raftConfiguration,
memberConfiguration, executor, commandsSerializer
);
-
- assertThat(service, willCompleteSuccessfully());
-
- return service.join();
}
private RaftGroupService startRaftGroupServiceWithRefreshLeader(List<Peer>
peers) {
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 929f65496e..8728883e1a 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -183,30 +183,35 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
* @param factory Message factory.
* @param raftConfiguration RAFT configuration.
* @param configuration Group configuration.
- * @param getLeader True to get the group's leader upon service creation.
* @param executor RPC executor.
* @param logicalTopologyService Logical topology service.
* @param notifyOnSubscription Whether to notify callback after
subscription to pass the current leader and term into it, even
* if the leader did not change in that moment (see {@link
#subscribeLeader}).
* @param cmdMarshaller Marshaller that should be used to
serialize/deserialize commands.
- * @return Future to create a raft client.
+ * @return New Raft client.
*/
- public static CompletableFuture<TopologyAwareRaftGroupService> start(
+ public static TopologyAwareRaftGroupService start(
ReplicationGroupId groupId,
ClusterService cluster,
RaftMessagesFactory factory,
RaftConfiguration raftConfiguration,
PeersAndLearners configuration,
- boolean getLeader,
ScheduledExecutorService executor,
LogicalTopologyService logicalTopologyService,
RaftGroupEventsClientListener eventsClientListener,
boolean notifyOnSubscription,
Marshaller cmdMarshaller
) {
- return RaftGroupServiceImpl.start(groupId, cluster, factory,
raftConfiguration, configuration, getLeader, executor, cmdMarshaller)
- .thenApply(raftGroupService -> new
TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration,
- raftGroupService, logicalTopologyService,
eventsClientListener, notifyOnSubscription));
+ return new TopologyAwareRaftGroupService(
+ cluster,
+ factory,
+ executor,
+ raftConfiguration,
+ RaftGroupServiceImpl.start(groupId, cluster, factory,
raftConfiguration, configuration, executor, cmdMarshaller),
+ logicalTopologyService,
+ eventsClientListener,
+ notifyOnSubscription
+ );
}
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
index c4c4020f19..785a76811c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft.client;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.network.ClusterService;
@@ -63,7 +62,7 @@ public class TopologyAwareRaftGroupServiceFactory implements
RaftServiceFactory<
/** {@inheritDoc} */
@Override
- public CompletableFuture<TopologyAwareRaftGroupService>
startRaftGroupService(
+ public TopologyAwareRaftGroupService startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners peersAndLearners,
RaftConfiguration raftConfiguration,
@@ -76,7 +75,6 @@ public class TopologyAwareRaftGroupServiceFactory implements
RaftServiceFactory<
raftMessagesFactory,
raftConfiguration,
peersAndLearners,
- true,
raftClientExecutor,
logicalTopologyService,
eventsClientListener,
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b688847643..bed206e337 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -43,7 +43,6 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -65,7 +64,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import org.apache.ignite.internal.close.ManuallyCloseable;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
@@ -193,9 +191,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Replicas. */
private final ConcurrentHashMap<ReplicationGroupId,
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
- /** Futures for stopping raft nodes if the corresponding replicas weren't
started. */
- private final Map<RaftNodeId,
CompletableFuture<TopologyAwareRaftGroupService>> raftClientsFutures = new
ConcurrentHashMap<>();
-
private final ClockService clockService;
/** Scheduled executor for idle safe time sync. */
@@ -635,75 +630,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
}
- private CompletableFuture<Replica> startReplicaInternal(
- RaftGroupEventsListener raftGroupEventsListener,
- RaftGroupListener raftGroupListener,
- boolean isVolatileStorage,
- @Nullable SnapshotStorageFactory snapshotStorageFactory,
- Function<RaftGroupService, ReplicaListener> createListener,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker,
- ReplicationGroupId replicaGrpId,
- PeersAndLearners newConfiguration
- ) throws NodeStoppingException {
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
-
- RaftGroupOptions groupOptions =
groupOptionsForPartition(isVolatileStorage, snapshotStorageFactory);
-
- // TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
- // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
- CompletableFuture<Replica> replicaFuture = ((Loza) raftManager)
- .startRaftGroupNode(
- raftNodeId,
- newConfiguration,
- raftGroupListener,
- raftGroupEventsListener,
- groupOptions,
- raftGroupServiceFactory
- )
- .thenApplyAsync(raftClient -> {
- LOG.info("Replica is about to start
[replicationGroupId={}].", replicaGrpId);
-
- ReplicaListener replicaListener =
createListener.apply(raftClient);
-
- ClusterNode localNode =
clusterNetSvc.topologyService().localMember();
-
- return new ReplicaImpl(
- replicaGrpId,
- replicaListener,
- storageIndexTracker,
- localNode,
- executor,
- placementDriver,
- clockService,
- replicaStateManager::reserveReplica
- );
- }, replicasCreationExecutor)
- .thenComposeAsync(newReplica -> replicas.compute(replicaGrpId,
(k, existingReplicaFuture) -> {
- if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
- assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
- LOG.info("Replica is started
[replicationGroupId={}].", replicaGrpId);
-
- return completedFuture(newReplica);
- } else {
- LOG.info("Replica is started, existing replica waiter
was completed [replicationGroupId={}].", replicaGrpId);
-
- existingReplicaFuture.complete(newReplica);
-
- return existingReplicaFuture;
- }
- }), replicasCreationExecutor);
-
- var eventParams = new LocalReplicaEventParameters(replicaGrpId);
-
- return fireEvent(AFTER_REPLICA_STARTED, eventParams)
- .exceptionally(e -> {
- LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
-
- return null;
- })
- .thenCompose(v -> replicaFuture);
- }
-
/**
* Creates and starts a new replica.
*
@@ -735,14 +661,23 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
try {
return startReplicaInternal(
- raftGroupEventsListener,
+ replicaGrpId,
+ snapshotStorageFactory,
+ newConfiguration,
raftGroupListener,
+ raftGroupEventsListener,
isVolatileStorage,
- snapshotStorageFactory,
- createListener,
- storageIndexTracker,
- replicaGrpId,
- newConfiguration);
+ (raftClient) -> new ReplicaImpl(
+ replicaGrpId,
+ createListener.apply(raftClient),
+ storageIndexTracker,
+ clusterNetSvc.topologyService().localMember(),
+ executor,
+ placementDriver,
+ clockService,
+ replicaStateManager::reserveReplica
+ )
+ );
} finally {
busyLock.leaveBusy();
}
@@ -762,7 +697,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
- Function<RaftGroupService, ReplicaListener> listener,
+ Function<RaftGroupService, ReplicaListener> createListener,
SnapshotStorageFactory snapshotStorageFactory,
PeersAndLearners newConfiguration,
RaftGroupListener raftGroupListener,
@@ -774,34 +709,40 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
try {
- return internalStartZoneReplica(
+ return startReplicaInternal(
replicaGrpId,
- listener,
snapshotStorageFactory,
newConfiguration,
raftGroupListener,
raftGroupEventsListener,
- busyLock
+ false,
+ (raftClient) -> new ZonePartitionReplicaImpl(
+ replicaGrpId,
+ createListener.apply(raftClient),
+ raftClient
+ )
);
} finally {
busyLock.leaveBusy();
}
}
- private CompletableFuture<Replica> internalStartZoneReplica(
+ private CompletableFuture<Replica> startReplicaInternal(
ReplicationGroupId replicaGrpId,
- Function<RaftGroupService, ReplicaListener> listener,
- SnapshotStorageFactory snapshotStorageFactory,
+ @Nullable SnapshotStorageFactory snapshotStorageFactory,
PeersAndLearners newConfiguration,
RaftGroupListener raftGroupListener,
RaftGroupEventsListener raftGroupEventsListener,
- IgniteSpinBusyLock busyLock
+ boolean isVolatileStorage,
+ Function<TopologyAwareRaftGroupService, Replica> createReplica
) throws NodeStoppingException {
- RaftGroupOptions groupOptions = groupOptionsForPartition(false,
snapshotStorageFactory);
-
RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
- CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut =
((Loza) raftManager).startRaftGroupNode(
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(isVolatileStorage, snapshotStorageFactory);
+
+ // TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
+ // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
+ TopologyAwareRaftGroupService raftClient = ((Loza)
raftManager).startRaftGroupNode(
raftNodeId,
newConfiguration,
raftGroupListener,
@@ -810,51 +751,34 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
raftGroupServiceFactory
);
- raftClientsFutures.put(raftNodeId, newRaftClientFut);
+ LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
- return newRaftClientFut.thenComposeAsync(raftClient -> {
- if (!busyLock.enterBusy()) {
- return failedFuture(new NodeStoppingException());
- }
-
- try {
- raftClientsFutures.remove(raftNodeId);
-
- LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
+ Replica newReplica = createReplica.apply(raftClient);
- Replica newReplica = new ZonePartitionReplicaImpl(
- replicaGrpId,
- listener.apply(raftClient),
- raftClient
- );
+ CompletableFuture<Replica> newReplicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+ if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
+ assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
+ LOG.info("Replica is started [replicationGroupId={}].",
replicaGrpId);
- CompletableFuture<Replica> replicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
- if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
- assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
- LOG.info("Replica is started
[replicationGroupId={}].", replicaGrpId);
+ return completedFuture(newReplica);
+ } else {
+ LOG.info("Replica is started, existing replica waiter was
completed [replicationGroupId={}].", replicaGrpId);
- return completedFuture(newReplica);
- } else {
- existingReplicaFuture.complete(newReplica);
- LOG.info("Replica is started, existing replica waiter
was completed [replicationGroupId={}].", replicaGrpId);
+ existingReplicaFuture.complete(newReplica);
- return existingReplicaFuture;
- }
- });
+ return existingReplicaFuture;
+ }
+ });
- var eventParams = new
LocalReplicaEventParameters(replicaGrpId);
+ var eventParams = new LocalReplicaEventParameters(replicaGrpId);
- return fireEvent(AFTER_REPLICA_STARTED, eventParams)
- .exceptionally(e -> {
- LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
+ return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+ .exceptionally(e -> {
+ LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
- return null;
- })
- .thenCompose(v -> replicaFuture);
- } finally {
- busyLock.leaveBusy();
- }
- }, executor);
+ return null;
+ })
+ .thenCompose(v -> newReplicaFuture);
}
/**
@@ -1033,34 +957,14 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds,
TimeUnit.SECONDS);
shutdownAndAwaitTermination(replicasCreationExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
- // A collection of lambdas with raft entities closing and replicas
completion with NodeStoppingException.
- Collection<ManuallyCloseable> closeables = new
ArrayList<>(raftClientsFutures.size() + 1);
-
- // Sequence of raft-entities stopping processes: if waiting
raft-client future completion finishes with an exception, then we
- // don't trying to stop raft-node.
- raftClientsFutures.forEach((raftNodeId, raftClientFuture) ->
closeables.add(() -> {
- raftClientFuture.get(shutdownTimeoutSeconds, TimeUnit.SECONDS);
-
- try {
- raftManager.stopRaftNode(raftNodeId);
- } catch (NodeStoppingException e) {
- throw new AssertionError("Raft node is stopping [raftNodeId="
+ raftNodeId
- + "], but it's abnormal, because Raft Manager must
stop strictly after Replica Manager.", e);
- }
- }));
-
- // The last is completion of replica futures with mandatory check that
all futures are complete before adding NodeStoppingException.
- // We couldn't do it in finally block because thus we're loosing
AssertionError and mandatory assert doesn't matter then.
- closeables.add(() -> {
- assert
replicas.values().stream().noneMatch(CompletableFuture::isDone)
- : "There are replicas alive [replicas="
- + replicas.entrySet().stream().filter(e ->
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
-
- replicas.values().forEach(replicaFuture ->
replicaFuture.completeExceptionally(new NodeStoppingException()));
- });
-
try {
- IgniteUtils.closeAllManually(closeables);
+ IgniteUtils.closeAllManually(() -> {
+ assert
replicas.values().stream().noneMatch(CompletableFuture::isDone)
+ : "There are replicas alive [replicas="
+ + replicas.entrySet().stream().filter(e ->
e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+
+ replicas.values().forEach(replicaFuture ->
replicaFuture.completeExceptionally(new NodeStoppingException()));
+ });
} catch (Exception e) {
return failedFuture(e);
}
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index e3b402fbf9..96f22952f7 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.replicator;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
@@ -185,7 +184,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
any(RaftGroupOptions.class),
any(TopologyAwareRaftGroupServiceFactory.class))
)
- .thenReturn(completedFuture(raftGroupService));
+ .thenReturn(raftGroupService);
when(createReplicaListener.notify(any())).thenReturn(falseCompletedFuture());
when(removeReplicaListener.notify(any())).thenReturn(falseCompletedFuture());
diff --git
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
index 192f69d542..6cd704b899 100644
---
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
@@ -522,13 +522,12 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
FACTORY,
raftConfiguration,
peersAndLearners(clusterServices, isServerAddress, nodes),
- true,
executor,
logicalTopologyService,
eventsClientListener,
notifyOnSubscription,
commandsMarshaller
- ).join();
+ );
}
private static PeersAndLearners peersAndLearners(
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
index a0c98f0acb..77dd33c3c6 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java
@@ -240,8 +240,7 @@ class ItMetastorageGroupDisasterRecoveryTest extends
ItSystemGroupDisasterRecove
private static RaftGroupService metastorageGroupClient(IgniteImpl ignite)
throws NodeStoppingException, ExecutionException,
InterruptedException, TimeoutException {
PeersAndLearners config =
PeersAndLearners.fromConsistentIds(Set.of(ignite.name()));
- CompletableFuture<RaftGroupService> future =
ignite.raftManager().startRaftGroupService(MetastorageGroupId.INSTANCE, config);
- return future.get(10, SECONDS);
+ return
ignite.raftManager().startRaftGroupService(MetastorageGroupId.INSTANCE, config);
}
private static String leaderName(RaftGroupService mgClient0) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 07c011786e..5ad2fa9614 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -187,7 +187,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
any(RaftGroupOptions.class),
any(TopologyAwareRaftGroupServiceFactory.class))
)
- .thenReturn(completedFuture(raftClient));
+ .thenReturn(raftClient);
requestsExecutor = Executors.newFixedThreadPool(
5,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 62b223b970..927ad11162 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -28,8 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -192,14 +190,9 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
}
private void startCluster(Map<String, PartialNode> cluster) throws
Exception {
- Collection<CompletableFuture<Void>> startingFutures = new
ArrayList<>(cluster.size());
for (PartialNode node : cluster.values()) {
- startingFutures.add(node.start());
+ node.start();
}
-
- CompletableFuture<Void> clusterReadyFuture =
CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new));
-
- assertThat(clusterReadyFuture, willCompleteSuccessfully());
}
/**
@@ -265,7 +258,7 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
this.nodeName = nodeName;
}
- CompletableFuture<Void> start() throws Exception {
+ void start() throws Exception {
clusterService = ClusterServiceTestUtils.clusterService(nodeName,
port.getAndIncrement(), NODE_FINDER);
assertThat(clusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
@@ -292,32 +285,28 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
TxManager txManagerMock = mock(TxManager.class);
- return raftManager.startRaftGroupNode(
- new RaftNodeId(GROUP_ID, new Peer(nodeName)),
- fromConsistentIds(cluster.keySet()),
- new PartitionListener(
- txManagerMock,
- mock(PartitionDataStorage.class),
- mock(StorageUpdateHandler.class),
- mock(TxStateStorage.class),
- mock(PendingComparableValuesTracker.class),
- mock(PendingComparableValuesTracker.class),
- mock(CatalogService.class),
- mock(SchemaRegistry.class),
- clockService,
- mock(IndexMetaStorage.class),
-
clusterService.topologyService().localMember().id(),
-
mock(MinimumRequiredTimeCollectorService.class)
- ),
- RaftGroupEventsListener.noopLsnr,
- RaftGroupOptions.defaults()
- .serverDataPath(workingDir.metaPath())
-
.setLogStorageFactory(partitionsLogStorageFactory)
- )
- .thenApply(raftClient -> {
- this.raftClient = raftClient;
- return null;
- });
+ this.raftClient = raftManager.startRaftGroupNode(
+ new RaftNodeId(GROUP_ID, new Peer(nodeName)),
+ fromConsistentIds(cluster.keySet()),
+ new PartitionListener(
+ txManagerMock,
+ mock(PartitionDataStorage.class),
+ mock(StorageUpdateHandler.class),
+ mock(TxStateStorage.class),
+ mock(PendingComparableValuesTracker.class),
+ mock(PendingComparableValuesTracker.class),
+ mock(CatalogService.class),
+ mock(SchemaRegistry.class),
+ clockService,
+ mock(IndexMetaStorage.class),
+
clusterService.topologyService().localMember().id(),
+ mock(MinimumRequiredTimeCollectorService.class)
+ ),
+ RaftGroupEventsListener.noopLsnr,
+ RaftGroupOptions.defaults()
+ .serverDataPath(workingDir.metaPath())
+ .setLogStorageFactory(partitionsLogStorageFactory)
+ );
}
void stop() throws Exception {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f3adb8f59d..287ebccb58 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
@@ -281,7 +280,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
RaftGroupService raftGrpSrvcMock =
mock(TopologyAwareRaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer("node0"));
- when(rm.startRaftGroupService(any(), any(), any(),
any())).thenAnswer(mock -> completedFuture(raftGrpSrvcMock));
+ when(rm.startRaftGroupService(any(), any(), any(),
any())).thenAnswer(mock -> raftGrpSrvcMock);
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
when(clusterService.topologyService()).thenReturn(topologyService);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 58c3e53e18..13e34a293b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -350,7 +350,7 @@ public class TableManagerTest extends IgniteAbstractTest {
@Test
public void testPreconfiguredTable() throws Exception {
when(rm.startRaftGroupService(any(), any(), any(), any()))
- .thenAnswer(mock ->
completedFuture(mock(TopologyAwareRaftGroupService.class)));
+ .thenAnswer(mock -> mock(TopologyAwareRaftGroupService.class));
TableManager tableManager = createTableManager(tblManagerFut);
@@ -661,7 +661,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean
isTxStorageUnderRebalance) throws NodeStoppingException {
when(rm.startRaftGroupService(any(), any(), any(), any()))
- .thenAnswer(mock ->
completedFuture(mock(TopologyAwareRaftGroupService.class)));
+ .thenAnswer(mock -> mock(TopologyAwareRaftGroupService.class));
createZone(1, 1);