This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4d2976dd25 IGNITE-18374 Remove RaftManager#prepareRaftGroup method
(#1438)
4d2976dd25 is described below
commit 4d2976dd25787d2ee047672ee4c94b297244de31
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Dec 14 13:05:23 2022 +0300
IGNITE-18374 Remove RaftManager#prepareRaftGroup method (#1438)
---
.../management/raft/ItCmgRaftServiceTest.java | 27 ++++++---
.../management/ClusterManagementGroupManager.java | 20 ++++---
.../client/ItMetaStorageServiceTest.java | 27 ++++++---
.../internal/metastorage/MetaStorageManager.java | 44 ++++++++------
.../apache/ignite/internal/raft/RaftManager.java | 41 +++----------
.../ignite/internal/raft/ItLearnersTest.java | 10 ++--
.../apache/ignite/internal/raft/ItLozaTest.java | 13 ++--
.../internal/raft/ItRaftGroupServiceTest.java | 14 ++---
.../java/org/apache/ignite/internal/raft/Loza.java | 70 ++++------------------
.../org/apache/ignite/internal/raft/LozaTest.java | 5 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../distributed/ItTxDistributedTestSingleNode.java | 11 ++--
12 files changed, 120 insertions(+), 164 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index a17c26388d..8ee84e07b3 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -21,7 +21,6 @@ import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.cluster.management.ClusterState.clusterState;
import static
org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
-import static
org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -43,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
+import org.apache.ignite.internal.cluster.management.CmgGroupId;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
import
org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
@@ -54,8 +54,11 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -117,12 +120,20 @@ public class ItCmgRaftServiceTest {
.map(ClusterNode::name)
.collect(collectingAndThen(toSet(),
PeersAndLearners::fromConsistentIds));
- CompletableFuture<RaftGroupService> raftService =
raftManager.prepareRaftGroup(
- INSTANCE,
-
configuration.peer(clusterService.topologyService().localMember().name()),
- configuration,
- () -> new CmgRaftGroupListener(raftStorage, new
LogicalTopologyImpl(raftStorage), term -> {})
- );
+ Peer serverPeer = configuration.peer(localMember().name());
+
+ CompletableFuture<RaftGroupService> raftService;
+
+ if (serverPeer == null) {
+ raftService =
raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration);
+ } else {
+ raftService = raftManager.startRaftGroupNode(
+ new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
+ configuration,
+ new CmgRaftGroupListener(raftStorage, new
LogicalTopologyImpl(raftStorage), term -> {}),
+ RaftGroupEventsListener.noopLsnr
+ );
+ }
assertThat(raftService, willCompleteSuccessfully());
@@ -133,7 +144,7 @@ public class ItCmgRaftServiceTest {
}
void beforeNodeStop() throws NodeStoppingException {
- raftManager.stopRaftNodes(INSTANCE);
+ raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
raftManager.beforeNodeStop();
clusterService.beforeNodeStop();
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index dcbae3f7aa..b46c9cbb99 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -22,7 +22,6 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
-import static
org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE;
import java.util.Collection;
import java.util.List;
@@ -55,9 +54,11 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.Status;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -404,7 +405,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
raftService = null;
}
- raftManager.stopRaftNodes(INSTANCE);
+ raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
localStateStorage.clear().get();
} catch (Exception e) {
@@ -506,14 +507,17 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(nodeNames, learnerNames);
+ Peer serverPeer = isLearner ?
configuration.learner(thisNodeConsistentId) :
configuration.peer(thisNodeConsistentId);
+
+ assert serverPeer != null;
+
try {
return raftManager
- .prepareRaftGroup(
- INSTANCE,
- isLearner ?
configuration.learner(thisNodeConsistentId) :
configuration.peer(thisNodeConsistentId),
+ .startRaftGroupNode(
+ new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
configuration,
- () -> new
CmgRaftGroupListener(clusterStateStorage, logicalTopology,
this::onLogicalTopologyChanged),
- this::createCmgRaftGroupEventsListener
+ new CmgRaftGroupListener(clusterStateStorage,
logicalTopology, this::onLogicalTopologyChanged),
+ createCmgRaftGroupEventsListener()
)
.thenApply(service -> new CmgRaftService(service,
clusterService, logicalTopology));
} catch (Exception e) {
@@ -667,7 +671,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10,
TimeUnit.SECONDS);
- raftManager.stopRaftNodes(INSTANCE);
+ raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
// Fail the future to unblock dependent operations
joinFuture.completeExceptionally(new NodeStoppingException());
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index f766e0a971..313c49a08d 100644
---
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -29,7 +29,6 @@ import static
org.apache.ignite.internal.metastorage.client.ItMetaStorageService
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
-import static
org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
@@ -69,6 +68,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.common.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.server.AbstractCompoundCondition;
import org.apache.ignite.internal.metastorage.server.AbstractSimpleCondition;
@@ -83,8 +83,11 @@ import
org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -244,7 +247,8 @@ public class ItMetaStorageServiceTest {
public void afterTest() throws Exception {
Stream<AutoCloseable> stopRaftGroupServices =
raftGroupServices.stream().map(service -> service::shutdown);
- Stream<AutoCloseable> stopRaftGroups =
raftManagers.stream().map(manager -> () -> manager.stopRaftNodes(INSTANCE));
+ Stream<AutoCloseable> stopRaftGroups = raftManagers.stream()
+ .map(manager -> () ->
manager.stopRaftNodes(MetastorageGroupId.INSTANCE));
Stream<AutoCloseable> beforeNodeStop =
Stream.concat(raftManagers.stream(), cluster.stream()).map(c ->
c::beforeNodeStop);
@@ -906,7 +910,7 @@ public class ItMetaStorageServiceTest {
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(Set.of(localName));
RaftGroupService metaStorageRaftSvc2 = raftManagers.get(1)
- .startRaftGroupService(INSTANCE, configuration)
+ .startRaftGroupService(MetastorageGroupId.INSTANCE,
configuration)
.get(3, TimeUnit.SECONDS);
raftGroupServices.add(metaStorageRaftSvc2);
@@ -1080,11 +1084,16 @@ public class ItMetaStorageServiceTest {
raftManagers.add(raftManager);
- return raftManager.prepareRaftGroup(
- INSTANCE,
-
configuration.peer(node.topologyService().localMember().name()),
- configuration,
- () -> new MetaStorageListener(mockStorage)
- );
+ Peer serverPeer =
configuration.peer(node.topologyService().localMember().name());
+
+ if (serverPeer == null) {
+ return
raftManager.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
+ } else {
+ var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE,
serverPeer);
+
+ return raftManager.startRaftGroupNode(
+ nodeId, configuration, new
MetaStorageListener(mockStorage), RaftGroupEventsListener.noopLsnr
+ );
+ }
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 0e5f5ae9e0..fdb20aa7e1 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.metastorage;
-import static
org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.CURSOR_CLOSING_ERR;
@@ -45,6 +44,7 @@ import
org.apache.ignite.internal.metastorage.client.OperationTimeoutException;
import org.apache.ignite.internal.metastorage.client.StatementResult;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.metastorage.common.MetaStorageException;
+import org.apache.ignite.internal.metastorage.common.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
@@ -52,7 +52,9 @@ import
org.apache.ignite.internal.metastorage.watch.KeyCriterion;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -166,29 +168,33 @@ public class MetaStorageManager implements
IgniteComponent {
Peer localPeer = configuration.peer(thisNode.name());
- if (localPeer != null) {
- clusterService.topologyService().addEventHandler(new
TopologyEventHandler() {
- @Override
- public void onDisappeared(ClusterNode member) {
- metaStorageSvcFut.thenAccept(svc ->
svc.closeCursors(member.id()));
- }
- });
-
- storage.start();
- }
+ CompletableFuture<RaftGroupService> raftServiceFuture;
try {
- CompletableFuture<RaftGroupService> raftServiceFuture =
raftMgr.prepareRaftGroup(
- INSTANCE,
- localPeer,
- configuration,
- () -> new MetaStorageListener(storage)
- );
+ if (localPeer == null) {
+ raftServiceFuture =
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
+ } else {
+ clusterService.topologyService().addEventHandler(new
TopologyEventHandler() {
+ @Override
+ public void onDisappeared(ClusterNode member) {
+ metaStorageSvcFut.thenAccept(svc ->
svc.closeCursors(member.id()));
+ }
+ });
+
+ storage.start();
- return raftServiceFuture.thenApply(service -> new
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+ raftServiceFuture = raftMgr.startRaftGroupNode(
+ new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+ configuration,
+ new MetaStorageListener(storage),
+ RaftGroupEventsListener.noopLsnr
+ );
+ }
} catch (NodeStoppingException e) {
return CompletableFuture.failedFuture(e);
}
+
+ return raftServiceFuture.thenApply(service -> new
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
}
/** {@inheritDoc} */
@@ -229,7 +235,7 @@ public class MetaStorageManager implements IgniteComponent {
IgniteUtils.closeAll(
this::stopDeployedWatches,
() -> {
- if (raftMgr.stopRaftNodes(INSTANCE)) {
+ if
(raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE)) {
storage.close();
}
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index 0d3cc0d37c..09446af8a5 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -18,55 +18,30 @@
package org.apache.ignite.internal.raft;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.Nullable;
/**
* Raft manager.
*/
public interface RaftManager extends IgniteComponent {
/**
- * Optionally starts a Raft node and creates a Raft group service
providing operations on a Raft group.
+ * Starts a Raft group and a Raft service on the current node.
*
- * @param groupId Raft group ID.
- * @param serverPeer Local peer that will host the Raft node. If {@code
null} - no nodes will be started, but only the Raft client
- * service.
- * @param configuration Peers and Learners of the Raft group.
- * @param lsnrSupplier Raft group listener supplier.
- * @return Future representing pending completion of the operation.
- * @throws NodeStoppingException If node stopping intention was detected.
- */
- // TODO: remove this method, see
https://issues.apache.org/jira/browse/IGNITE-18374
- CompletableFuture<RaftGroupService> prepareRaftGroup(
- ReplicationGroupId groupId,
- @Nullable Peer serverPeer,
- PeersAndLearners configuration,
- Supplier<RaftGroupListener> lsnrSupplier
- ) throws NodeStoppingException;
-
- /**
- * Optionally starts a Raft node and creates a Raft group service
providing operations on a Raft group.
- *
- * @param groupId Raft group ID.
- * @param serverPeer Local peer that will host the Raft node. If {@code
null} - no nodes will be started, but only the Raft client
- * service.
+ * @param nodeId Raft node ID.
* @param configuration Peers and Learners of the Raft group.
- * @param lsnrSupplier Raft group listener supplier.
- * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
- * @return Future representing pending completion of the operation.
+ * @param lsnr Raft group listener.
+ * @param eventsLsnr Raft group events listener.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- CompletableFuture<RaftGroupService> prepareRaftGroup(
- ReplicationGroupId groupId,
- @Nullable Peer serverPeer,
+ CompletableFuture<RaftGroupService> startRaftGroupNode(
+ RaftNodeId nodeId,
PeersAndLearners configuration,
- Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr
) throws NodeStoppingException;
/**
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index 5852345397..b0e5fe49ba 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -224,7 +224,7 @@ public class ItLearnersTest extends IgniteAbstractTest {
RaftNode learner1 = nodes.get(1);
CompletableFuture<RaftGroupService> service2 =
- startRaftGroup(learner1,
configuration.learner(learner1.consistentId()), newConfiguration, new
TestRaftGroupListener());
+ startRaftGroup(learner1,
newConfiguration.learner(learner1.consistentId()), newConfiguration, new
TestRaftGroupListener());
// Check that learners and peers have been set correctly.
Stream.of(service1, service2).forEach(service -> {
@@ -401,11 +401,11 @@ public class ItLearnersTest extends IgniteAbstractTest {
RaftGroupListener listener
) {
try {
- CompletableFuture<RaftGroupService> future =
node.loza.prepareRaftGroup(
- RAFT_GROUP_ID,
- serverPeer,
+ CompletableFuture<RaftGroupService> future =
node.loza.startRaftGroupNode(
+ new RaftNodeId(RAFT_GROUP_ID, serverPeer),
configuration,
- () -> listener
+ listener,
+ RaftGroupEventsListener.noopLsnr
);
return future.thenApply(s -> {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index c9374ca0cd..fbcf3dcf3a 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -35,7 +35,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -77,17 +76,15 @@ public class ItLozaTest {
* @return Raft group service.
*/
private RaftGroupService startClient(TestReplicationGroupId groupId,
ClusterNode node, Loza loza) throws Exception {
- Supplier<RaftGroupListener> raftGroupListenerSupplier = () -> {
- RaftGroupListener raftGroupListener =
mock(RaftGroupListener.class);
+ RaftGroupListener raftGroupListener = mock(RaftGroupListener.class);
- when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true);
-
- return raftGroupListener;
- };
+ when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true);
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(Set.of(node.name()));
- return loza.prepareRaftGroup(groupId, configuration.peer(node.name()),
configuration, raftGroupListenerSupplier)
+ var nodeId = new RaftNodeId(groupId, configuration.peer(node.name()));
+
+ return loza.startRaftGroupNode(nodeId, configuration,
raftGroupListener, RaftGroupEventsListener.noopLsnr)
.get(10, TimeUnit.SECONDS);
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 226c4045bf..10a4a25466 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -224,14 +224,14 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
}
CompletableFuture<RaftGroupService> startRaftGroup(PeersAndLearners
configuration) {
+ String nodeName =
clusterService.topologyService().localMember().name();
+
+ Peer serverPeer = configuration.peer(nodeName);
+
+ var nodeId = new RaftNodeId(RAFT_GROUP_NAME, serverPeer == null ?
configuration.learner(nodeName) : serverPeer);
+
try {
- raftGroupService = loza.prepareRaftGroup(
- RAFT_GROUP_NAME,
-
configuration.peer(clusterService.topologyService().localMember().name()),
- configuration,
- () -> mock(RaftGroupListener.class),
- () -> eventsListener
- );
+ raftGroupService = loza.startRaftGroupNode(nodeId,
configuration, mock(RaftGroupListener.class), eventsListener);
} catch (NodeStoppingException e) {
return CompletableFuture.failedFuture(e);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 0123e96155..968313d7c8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.raft;
-import static org.apache.ignite.internal.raft.RaftGroupEventsListener.noopLsnr;
-
import java.nio.file.Path;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -26,7 +24,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -166,62 +163,13 @@ public class Loza implements RaftManager {
}
@Override
- public CompletableFuture<RaftGroupService> prepareRaftGroup(
- ReplicationGroupId groupId,
- @Nullable Peer serverPeer,
- PeersAndLearners configuration,
- Supplier<RaftGroupListener> lsnrSupplier
- ) throws NodeStoppingException {
- return prepareRaftGroup(groupId, serverPeer, configuration,
lsnrSupplier, () -> noopLsnr, RaftGroupOptions.defaults());
- }
-
- @Override
- public CompletableFuture<RaftGroupService> prepareRaftGroup(
- ReplicationGroupId groupId,
- @Nullable Peer serverPeer,
- PeersAndLearners configuration,
- Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier
- ) throws NodeStoppingException {
- return prepareRaftGroup(groupId, serverPeer, configuration,
lsnrSupplier, raftGrpEvtsLsnrSupplier, RaftGroupOptions.defaults());
- }
-
- /**
- * Optionally starts a Raft node and creates a Raft group service
providing operations on a Raft group.
- *
- * @param groupId Raft group ID.
- * @param serverPeer Local peer that will host the Raft node. If {@code
null} - no nodes will be started, but only the Raft client
- * service.
- * @param configuration Peers and Learners of the Raft group.
- * @param lsnrSupplier Raft group listener supplier.
- * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
- * @param groupOptions Options to apply to the group.
- * @return Future representing pending completion of the operation.
- * @throws NodeStoppingException If node stopping intention was detected.
- */
- private CompletableFuture<RaftGroupService> prepareRaftGroup(
- ReplicationGroupId groupId,
- @Nullable Peer serverPeer,
+ public CompletableFuture<RaftGroupService> startRaftGroupNode(
+ RaftNodeId nodeId,
PeersAndLearners configuration,
- Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
- RaftGroupOptions groupOptions
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr
) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- if (serverPeer != null) {
- var nodeId = new RaftNodeId(groupId, serverPeer);
-
- startRaftGroupNodeInternal(nodeId, configuration,
lsnrSupplier.get(), raftGrpEvtsLsnrSupplier.get(), groupOptions);
- }
-
- return startRaftGroupServiceInternal(groupId, configuration);
- } finally {
- busyLock.leaveBusy();
- }
+ return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr,
RaftGroupOptions.defaults());
}
/**
@@ -234,7 +182,7 @@ public class Loza implements RaftManager {
* @param groupOptions Options to apply to the group.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- public void startRaftGroupNode(
+ public CompletableFuture<RaftGroupService> startRaftGroupNode(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -246,7 +194,7 @@ public class Loza implements RaftManager {
}
try {
- startRaftGroupNodeInternal(nodeId, configuration, lsnr,
eventsLsnr, groupOptions);
+ return startRaftGroupNodeInternal(nodeId, configuration, lsnr,
eventsLsnr, groupOptions);
} finally {
busyLock.leaveBusy();
}
@@ -268,7 +216,7 @@ public class Loza implements RaftManager {
}
}
- private void startRaftGroupNodeInternal(
+ private CompletableFuture<RaftGroupService> startRaftGroupNodeInternal(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -287,6 +235,8 @@ public class Loza implements RaftManager {
nodeId
));
}
+
+ return startRaftGroupServiceInternal(nodeId.groupId(), configuration);
}
private CompletableFuture<RaftGroupService>
startRaftGroupServiceInternal(ReplicationGroupId grpId, PeersAndLearners
configuration) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index e12f4ab839..d32d000b2c 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -72,10 +72,13 @@ public class LozaTest extends IgniteAbstractTest {
Peer serverPeer = configuration.peer("test1");
+ assertThrows(
+ NodeStoppingException.class,
+ () -> loza.startRaftGroupNode(new RaftNodeId(raftGroupId,
serverPeer), configuration, null, null)
+ );
assertThrows(NodeStoppingException.class, () ->
loza.startRaftGroupService(raftGroupId, configuration));
assertThrows(NodeStoppingException.class, () -> loza.stopRaftNode(new
RaftNodeId(raftGroupId, serverPeer)));
assertThrows(NodeStoppingException.class, () ->
loza.stopRaftNodes(raftGroupId));
- assertThrows(NodeStoppingException.class, () ->
loza.prepareRaftGroup(raftGroupId, serverPeer, configuration, () -> null));
}
/**
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index f2e26cb317..feb8242f57 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -442,7 +442,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
* @return Table manager.
*/
private TableManager mockManagers() throws NodeStoppingException {
- when(rm.prepareRaftGroup(any(), any(), any(), any())).thenAnswer(mock
-> {
+ when(rm.startRaftGroupNode(any(), any(), any(),
any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer("test"));
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 9da886c7b7..9673817b74 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -416,17 +417,17 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(partAssignments);
- CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).prepareRaftGroup(
- grpId,
- configuration.peer(assignment),
+ CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).startRaftGroupNode(
+ new RaftNodeId(grpId, configuration.peer(assignment)),
configuration,
- () -> new PartitionListener(
+ new PartitionListener(
new
TestPartitionDataStorage(testMpPartStorage),
new TestTxStateStorage(),
txManagers.get(assignment),
() -> Map.of(pkStorage.get().id(),
pkStorage.get()),
partId
- )
+ ),
+ RaftGroupEventsListener.noopLsnr
).thenAccept(
raftSvc -> {
try {