This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 01e0035c09 IGNITE-19831 Rename RaftManager methods to better reflect
their behavior (#2251)
01e0035c09 is described below
commit 01e0035c09bb643c919f94199d31a5dcc59969aa
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Jun 26 11:24:05 2023 +0300
IGNITE-19831 Rename RaftManager methods to better reflect their behavior
(#2251)
---
.../management/raft/ItCmgRaftServiceTest.java | 2 +-
.../management/ClusterManagementGroupManager.java | 2 +-
.../metastorage/impl/ItMetaStorageServiceTest.java | 4 +-
.../server/OnRevisionAppliedCallback.java | 2 +-
.../impl/StandaloneMetaStorageManager.java | 2 +-
.../apache/ignite/internal/raft/RaftManager.java | 56 ++++++++++++++++++----
.../ignite/internal/raft/ItLearnersTest.java | 2 +-
.../apache/ignite/internal/raft/ItLozaTest.java | 2 +-
.../internal/raft/ItRaftGroupServiceTest.java | 4 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 50 +++++++++++--------
.../org/apache/ignite/internal/raft/LozaTest.java | 2 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
12 files changed, 90 insertions(+), 40 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 624ac35dc0..17132b91c3 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -128,7 +128,7 @@ public class ItCmgRaftServiceTest {
if (serverPeer == null) {
raftService =
raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration);
} else {
- raftService = raftManager.startRaftGroupNode(
+ raftService =
raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
configuration,
new CmgRaftGroupListener(
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 063c6067c3..594fc93e22 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -590,7 +590,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
try {
return raftManager
- .startRaftGroupNode(
+ .startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
raftConfiguration,
new CmgRaftGroupListener(clusterStateStorage,
logicalTopology, this::onLogicalTopologyChanged),
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index bb8fb34feb..dfc5a3d3ab 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -235,7 +235,9 @@ public class ItMetaStorageServiceTest {
var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
try {
- return raftManager.startRaftGroupNode(raftNodeId,
configuration, listener, RaftGroupEventsListener.noopLsnr);
+ return raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
+ raftNodeId, configuration, listener,
RaftGroupEventsListener.noopLsnr
+ );
} catch (NodeStoppingException e) {
throw new IllegalStateException(e);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index e02534db44..93a2fc1c37 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -29,7 +29,7 @@ public interface OnRevisionAppliedCallback {
/**
* Notifies of completion of processing of Meta Storage watches for a
particular revision.
*
- * @param watchEvent Event with modified Meta Storage entries processed at
least one Watch.
+ * @param watchEvent Event with modified Meta Storage entries processed by
at least one Watch.
* @param newSafeTime Safe time of the applied revision.
* @return Future that represents the state of the execution of the
callback.
*/
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 09d651bf3d..4f3770c00e 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -118,7 +118,7 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
RaftGroupService raftGroupService = mock(RaftGroupService.class);
try {
- when(raftManager.startRaftGroupNode(
+ when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(
any(),
any(),
listenerCaptor.capture(),
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index d819aeebee..861e19ce6c 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -26,18 +26,50 @@ import org.apache.ignite.lang.NodeStoppingException;
/**
* Raft manager.
+ *
+ * <p>This class contains two groups of methods for starting Raft nodes:
{@code #startRaftGroupNode} and
+ * {@code startRaftGroupNodeAndWaitNodeReadyFuture} (and its overloads). When
using {@code #startRaftGroupNode}, Raft log re-application
+ * does not get performed and external synchronisation methods must be used to
avoid observing a Raft node in inconsistent state. The other
+ * group of methods synchronously waits for the Raft log to be re-applied, so
no external synchronisation is required.
+ *
+ * <p>Usually Raft recovery is done synchronously, but sometimes there's an
implicit dependency between Raft nodes, where the recovery of
+ * one node triggers the recovery of the other and vice versa. In this case,
{@code #startRaftGroupNode} group of methods should be used
+ * to avoid deadlocks during Raft node startup.
*/
public interface RaftManager extends IgniteComponent {
+ /**
+ * Starts a Raft group and a Raft service on the current node, using the
given service factory.
+ *
+ * <p>Does not wait for the Raft log to be applied.
+ *
+ * @param nodeId Raft node ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param lsnr Raft group listener.
+ * @param eventsLsnr Raft group events listener.
+ * @param factory Service factory.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ RaftServiceFactory<T> factory
+ ) throws NodeStoppingException;
+
/**
* Starts a Raft group and a Raft service on the current node.
*
+ * <p>Synchronously waits for the Raft log to be applied.
+ *
* @param nodeId Raft node ID.
* @param configuration Peers and Learners of the Raft group.
* @param lsnr Raft group listener.
* @param eventsLsnr Raft group events listener.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- CompletableFuture<RaftGroupService> startRaftGroupNode(
+ // FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
+ CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
@@ -45,42 +77,46 @@ public interface RaftManager extends IgniteComponent {
) throws NodeStoppingException;
/**
- * Starts a Raft group and a Raft service on the current node, using the
given raft group service.
+ * Starts a Raft group and a Raft service on the current node.
+ *
+ * <p>Synchronously waits for the Raft log to be applied.
*
* @param nodeId Raft node ID.
* @param configuration Peers and Learners of the Raft group.
* @param lsnr Raft group listener.
* @param eventsLsnr Raft group events listener.
- * @param factory Service factory.
+ * @param disruptorConfiguration Configuration own (not shared) striped
disruptor for FSMCaller service of raft node.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ // FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
+ CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr,
- RaftServiceFactory<T> factory
+ RaftNodeDisruptorConfiguration disruptorConfiguration
) throws NodeStoppingException;
/**
- * Starts a Raft group and a Raft service on the current node.
+ * Starts a Raft group and a Raft service on the current node, using the
given service factory.
*
- * <p>Synchronously waits for the plot log to be applied.
+ * <p>Synchronously waits for the Raft log to be applied.
*
* @param nodeId Raft node ID.
* @param configuration Peers and Learners of the Raft group.
* @param lsnr Raft group listener.
* @param eventsLsnr Raft group events listener.
- * @param ownFsmCallerExecutorDisruptorConfig Configuration own (not
shared) striped disruptor for FSMCaller service of raft node.
+ * @param factory Service factory.
* @throws NodeStoppingException If node stopping intention was detected.
*/
// FIXME: IGNITE-19047 Meta storage and cmg raft log re-application in
async manner
- CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeAndWaitNodeReadyFuture(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr,
- RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig
+ RaftNodeDisruptorConfiguration disruptorConfiguration,
+ RaftServiceFactory<T> factory
) throws NodeStoppingException;
/**
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index c5a8ac7237..149cfebf01 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -401,7 +401,7 @@ public class ItLearnersTest extends IgniteAbstractTest {
RaftGroupListener listener
) {
try {
- return node.loza.startRaftGroupNode(
+ return node.loza.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(RAFT_GROUP_ID, serverPeer),
memberConfiguration,
listener,
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 14979795c1..73f278dd85 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -83,7 +83,7 @@ public class ItLozaTest {
var nodeId = new RaftNodeId(groupId, configuration.peer(node.name()));
- return loza.startRaftGroupNode(nodeId, configuration,
raftGroupListener, RaftGroupEventsListener.noopLsnr)
+ return loza.startRaftGroupNodeAndWaitNodeReadyFuture(nodeId,
configuration, raftGroupListener, RaftGroupEventsListener.noopLsnr)
.get(10, TimeUnit.SECONDS);
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index a716640956..080800ea1d 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -233,7 +233,9 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
var nodeId = new RaftNodeId(RAFT_GROUP_NAME, serverPeer == null ?
configuration.learner(nodeName) : serverPeer);
try {
- raftGroupService = loza.startRaftGroupNode(nodeId,
configuration, mock(RaftGroupListener.class), eventsListener);
+ raftGroupService =
loza.startRaftGroupNodeAndWaitNodeReadyFuture(
+ nodeId, configuration, mock(RaftGroupListener.class),
eventsListener
+ );
} catch (NodeStoppingException e) {
return CompletableFuture.failedFuture(e);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index a89cc46296..9e4ea288cf 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -171,21 +171,6 @@ public class Loza implements RaftManager {
raftServer.stop();
}
- @Override
- public CompletableFuture<RaftGroupService> startRaftGroupNode(
- RaftNodeId nodeId,
- PeersAndLearners configuration,
- RaftGroupListener lsnr,
- RaftGroupEventsListener eventsLsnr
- ) throws NodeStoppingException {
- CompletableFuture<RaftGroupService> fut = startRaftGroupNode(nodeId,
configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
- raftServer.raftNodeReadyFuture(nodeId.groupId()).join();
-
- return fut;
- }
-
@Override
public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNode(
RaftNodeId nodeId,
@@ -247,28 +232,53 @@ public class Loza implements RaftManager {
}
}
+ @Override
+ public CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr
+ ) throws NodeStoppingException {
+ CompletableFuture<RaftGroupService> fut = startRaftGroupNode(nodeId,
configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
+ raftServer.raftNodeReadyFuture(nodeId.groupId()).join();
+
+ return fut;
+ }
+
@Override
public CompletableFuture<RaftGroupService>
startRaftGroupNodeAndWaitNodeReadyFuture(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr,
- RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig
+ RaftNodeDisruptorConfiguration disruptorConfiguration
) throws NodeStoppingException {
- assert ownFsmCallerExecutorDisruptorConfig != null;
+ return startRaftGroupNodeAndWaitNodeReadyFuture(nodeId, configuration,
lsnr, eventsLsnr, disruptorConfiguration, null);
+ }
+ @Override
+ public <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeAndWaitNodeReadyFuture(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ RaftNodeDisruptorConfiguration disruptorConfiguration,
+ @Nullable RaftServiceFactory<T> factory
+ ) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- CompletableFuture<RaftGroupService> startRaftServiceFuture =
startRaftGroupNodeInternal(
+ CompletableFuture<T> startRaftServiceFuture =
startRaftGroupNodeInternal(
nodeId,
configuration,
lsnr,
eventsLsnr,
-
RaftGroupOptions.defaults().ownFsmCallerExecutorDisruptorConfig(ownFsmCallerExecutorDisruptorConfig),
- null
+
RaftGroupOptions.defaults().ownFsmCallerExecutorDisruptorConfig(disruptorConfiguration),
+ factory
);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 5b74fd6f96..fc64784734 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -78,7 +78,7 @@ public class LozaTest extends IgniteAbstractTest {
assertThrows(
NodeStoppingException.class,
- () -> loza.startRaftGroupNode(new RaftNodeId(raftGroupId,
serverPeer), configuration, null, null)
+ () -> loza.startRaftGroupNodeAndWaitNodeReadyFuture(new
RaftNodeId(raftGroupId, serverPeer), configuration, null, null)
);
assertThrows(NodeStoppingException.class, () ->
loza.startRaftGroupService(raftGroupId, configuration));
assertThrows(NodeStoppingException.class, () -> loza.stopRaftNode(new
RaftNodeId(raftGroupId, serverPeer)));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 773393d6db..ab52605ca2 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -519,7 +519,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
* @return Table manager.
*/
private TableManager mockManagers() throws NodeStoppingException {
- when(rm.startRaftGroupNode(any(), any(), any(),
any())).thenAnswer(mock -> {
+ when(rm.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(),
any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer("test"));