This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8cecf78e30 IGNITE-23669 Add missing busy locks to
ClusterManagementGroupManager (#4717)
8cecf78e30 is described below
commit 8cecf78e305c90b0e7e5f428ac756ac81ca692ee
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Nov 14 14:37:41 2024 +0400
IGNITE-23669 Add missing busy locks to ClusterManagementGroupManager (#4717)
---
.../management/ClusterManagementGroupManager.java | 81 ++++++++++++----------
1 file changed, 46 insertions(+), 35 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d772e38f4a..ba82c87918 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -27,8 +27,6 @@ import static
org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.util.Collection;
import java.util.List;
@@ -41,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
import
org.apache.ignite.internal.cluster.management.events.BeforeStartRaftGroupEventParameters;
import
org.apache.ignite.internal.cluster.management.events.ClusterManagerGroupEvent;
@@ -52,6 +51,7 @@ import
org.apache.ignite.internal.cluster.management.network.messages.ClusterSta
import
org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import
org.apache.ignite.internal.cluster.management.network.messages.InitErrorMessage;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
import org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
@@ -467,20 +467,17 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LOG.info("Init command received, starting the CMG [nodes={}]",
msg.cmgNodes());
serviceFuture = startCmgRaftServiceWithEvents(msg.cmgNodes(),
msg.initialClusterConfiguration())
- .whenComplete((v, e) -> {
+ .whenComplete((v, e) -> inBusyLock(() -> {
if (e != null) {
- e = unwrapCause(e);
+ Throwable finalEx = unwrapCause(e);
- LOG.error("Unable to start CMG Raft service",
e);
+ LOG.error("Unable to start CMG Raft service",
finalEx);
- NetworkMessage response =
msgFactory.initErrorMessage()
- .cause(e.getMessage())
- .shouldCancel(!(e instanceof
IllegalInitArgumentException))
- .build();
+ NetworkMessage response =
initErrorMessage(finalEx);
clusterService.messagingService().respond(sender, response, correlationId);
}
- });
+ }));
} else {
// Raft service has been started, which means that this node
has already received an init command at least once.
LOG.info("Init command received, but the CMG has already been
started");
@@ -490,8 +487,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
// handle this case by applying only the first attempt and
returning the actual cluster state for all other
// attempts.
raftService = serviceFuture
- .thenCompose(service -> doInit(service, msg, null)
- .handle((v, e) -> {
+ .thenCompose(service -> inBusyLockAsync(() ->
doInit(service, msg, null))
+ .handle((v, e) -> inBusyLock(() -> {
NetworkMessage response;
if (e == null) {
@@ -499,20 +496,17 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
response =
msgFactory.initCompleteMessage().build();
} else {
- e = unwrapCause(e);
+ Throwable finalEx = unwrapCause(e);
- LOG.warn("Error when initializing the
CMG", e);
+ LOG.warn("Error when initializing the
CMG", finalEx);
- response = msgFactory.initErrorMessage()
- .cause(e.getMessage())
- .shouldCancel(!(e instanceof
IllegalInitArgumentException))
- .build();
+ response = initErrorMessage(finalEx);
}
clusterService.messagingService().respond(sender, response, correlationId);
return service;
- }))
+ })))
.whenComplete((cmgRaftService, e) -> {
if (e != null) {
LOG.warn("Error when handling the CMG Init", e);
@@ -561,13 +555,13 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
// The cluster state is broadcast via the messaging service;
hence, the future must be completed here on the leader node.
- raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock,
() -> {
+ raftServiceAfterJoin().thenAccept(service -> inBusyLock(() -> {
service.readClusterState()
.thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
updateLogicalTopology(service)
- .thenCompose(v -> inBusyLock(busyLock, () ->
service.updateLearners(term)))
- .thenAccept(v -> inBusyLock(busyLock, () -> {
+ .thenCompose(v -> inBusyLock(() ->
service.updateLearners(term)))
+ .thenAccept(v -> inBusyLock(() -> {
// Register a listener to send ClusterState
messages to new nodes.
TopologyService topologyService =
clusterService.topologyService();
@@ -601,6 +595,13 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
}
+ private InitErrorMessage initErrorMessage(Throwable finalEx) {
+ return msgFactory.initErrorMessage()
+ .cause(finalEx.getMessage())
+ .shouldCancel(!(finalEx instanceof
IllegalInitArgumentException))
+ .build();
+ }
+
/**
* This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
* physical topology during the election. Newly appeared nodes will be
added automatically after the new leader broadcasts the current
@@ -608,7 +609,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
*/
private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
return service.logicalTopology()
- .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
+ .thenCompose(logicalTopology -> inBusyLock(() -> {
Set<UUID> physicalTopologyIds =
clusterService.topologyService().allMembers()
.stream()
.map(ClusterNode::id)
@@ -631,7 +632,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/** Delegates call to {@link #destroyCmg()} but fires the associated
events. */
private CompletableFuture<Void> destroyCmgWithEvents() {
LOG.info("CMG destruction procedure started");
- return inBusyLockAsync(busyLock,
+ return inBusyLockAsync(
() ->
fireEvent(ClusterManagerGroupEvent.BEFORE_DESTROY_RAFT_GROUP,
EmptyEventParameters.INSTANCE)
.thenRunAsync(this::destroyCmg, this.scheduledExecutor)
.exceptionally(err -> {
@@ -685,7 +686,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
// Raft service might have been started on wrong CMG nodes,
because CMG state can change while a node is offline. In this
// case we need to re-create the service.
raftService = raftService
- .handle((service, e) -> {
+ .handle((service, e) -> inBusyLockAsync(() -> {
if (service != null &&
service.nodeNames().equals(state.cmgNodes())) {
LOG.info("ClusterStateMessage received, but
the CMG service is already started");
@@ -696,17 +697,15 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
// Service could not be started for some
reason, which might be due to starting on incorrect CMG nodes
assert e != null;
- if (e instanceof CompletionException) {
- e = e.getCause();
- }
+ Throwable finalEx = e instanceof
CompletionException ? e.getCause() : e;
// Nothing can be done if the node has not
passed validation.
- if (e instanceof JoinDeniedException) {
- return
CompletableFuture.<CmgRaftService>failedFuture(e);
+ if (finalEx instanceof JoinDeniedException) {
+ return failedFuture(finalEx);
}
LOG.warn("CMG service could not be started on
previous attempts. "
- + "Re-creating the CMG Raft service
[reason={}]", e, e.getMessage());
+ + "Re-creating the CMG Raft service
[reason={}]", finalEx, finalEx.getMessage());
return initCmgRaftService(state);
} else {
@@ -717,7 +716,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
return destroyCmgWithEvents()
.thenCompose(none ->
initCmgRaftService(state));
}
- })
+ }))
.thenCompose(Function.identity());
}
}
@@ -747,7 +746,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
private CompletableFuture<CmgRaftService>
startCmgRaftServiceWithEvents(Set<String> nodeNames, @Nullable String
initialClusterConfig) {
BeforeStartRaftGroupEventParameters params = new
BeforeStartRaftGroupEventParameters(nodeNames, initialClusterConfig);
return fireEvent(ClusterManagerGroupEvent.BEFORE_START_RAFT_GROUP,
params)
- .thenComposeAsync(v -> startCmgRaftService(nodeNames),
scheduledExecutor)
+ .thenComposeAsync(v -> inBusyLockAsync(() ->
startCmgRaftService(nodeNames)), scheduledExecutor)
.whenComplete((v, e) -> {
if (e != null) {
LOG.warn("Error when initializing the CMG", e);
@@ -823,13 +822,13 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
*/
private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
return startCmgRaftServiceWithEvents(state.cmgNodes(),
state.initialClusterConfiguration())
- .thenCompose(service -> {
+ .thenCompose(service -> inBusyLockAsync(() -> {
var localState = new LocalState(state.cmgNodes(),
state.clusterTag());
localStateStorage.saveLocalState(localState);
return validateAgainstCluster(service, state.clusterTag());
- });
+ }));
}
private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService
raftService) {
@@ -1138,6 +1137,18 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
});
}
+ private void inBusyLock(Runnable action) {
+ IgniteUtils.inBusyLock(busyLock, action);
+ }
+
+ private <T> T inBusyLock(Supplier<T> action) {
+ return IgniteUtils.inBusyLock(busyLock, action);
+ }
+
+ private <T> CompletableFuture<T>
inBusyLockAsync(Supplier<CompletableFuture<T>> action) {
+ return IgniteUtils.inBusyLockAsync(busyLock, action);
+ }
+
@TestOnly
LogicalTopologyImpl logicalTopologyImpl() {
return (LogicalTopologyImpl) logicalTopology;