This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cecf78e30 IGNITE-23669 Add missing busy locks to 
ClusterManagementGroupManager (#4717)
8cecf78e30 is described below

commit 8cecf78e305c90b0e7e5f428ac756ac81ca692ee
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Nov 14 14:37:41 2024 +0400

    IGNITE-23669 Add missing busy locks to ClusterManagementGroupManager (#4717)
---
 .../management/ClusterManagementGroupManager.java  | 81 ++++++++++++----------
 1 file changed, 46 insertions(+), 35 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d772e38f4a..ba82c87918 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -27,8 +27,6 @@ import static 
org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
 import java.util.Collection;
 import java.util.List;
@@ -41,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
 import 
org.apache.ignite.internal.cluster.management.events.BeforeStartRaftGroupEventParameters;
 import 
org.apache.ignite.internal.cluster.management.events.ClusterManagerGroupEvent;
@@ -52,6 +51,7 @@ import 
org.apache.ignite.internal.cluster.management.network.messages.ClusterSta
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import 
org.apache.ignite.internal.cluster.management.network.messages.InitErrorMessage;
 import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
 import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
 import org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
@@ -467,20 +467,17 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 LOG.info("Init command received, starting the CMG [nodes={}]", 
msg.cmgNodes());
 
                 serviceFuture = startCmgRaftServiceWithEvents(msg.cmgNodes(), 
msg.initialClusterConfiguration())
-                        .whenComplete((v, e) -> {
+                        .whenComplete((v, e) -> inBusyLock(() -> {
                             if (e != null) {
-                                e = unwrapCause(e);
+                                Throwable finalEx = unwrapCause(e);
 
-                                LOG.error("Unable to start CMG Raft service", 
e);
+                                LOG.error("Unable to start CMG Raft service", 
finalEx);
 
-                                NetworkMessage response = 
msgFactory.initErrorMessage()
-                                        .cause(e.getMessage())
-                                        .shouldCancel(!(e instanceof 
IllegalInitArgumentException))
-                                        .build();
+                                NetworkMessage response = 
initErrorMessage(finalEx);
 
                                 
clusterService.messagingService().respond(sender, response, correlationId);
                             }
-                        });
+                        }));
             } else {
                 // Raft service has been started, which means that this node 
has already received an init command at least once.
                 LOG.info("Init command received, but the CMG has already been 
started");
@@ -490,8 +487,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             // handle this case by applying only the first attempt and 
returning the actual cluster state for all other
             // attempts.
             raftService = serviceFuture
-                    .thenCompose(service -> doInit(service, msg, null)
-                            .handle((v, e) -> {
+                    .thenCompose(service -> inBusyLockAsync(() -> 
doInit(service, msg, null))
+                            .handle((v, e) -> inBusyLock(() -> {
                                 NetworkMessage response;
 
                                 if (e == null) {
@@ -499,20 +496,17 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
                                     response = 
msgFactory.initCompleteMessage().build();
                                 } else {
-                                    e = unwrapCause(e);
+                                    Throwable finalEx = unwrapCause(e);
 
-                                    LOG.warn("Error when initializing the 
CMG", e);
+                                    LOG.warn("Error when initializing the 
CMG", finalEx);
 
-                                    response = msgFactory.initErrorMessage()
-                                            .cause(e.getMessage())
-                                            .shouldCancel(!(e instanceof 
IllegalInitArgumentException))
-                                            .build();
+                                    response = initErrorMessage(finalEx);
                                 }
 
                                 
clusterService.messagingService().respond(sender, response, correlationId);
 
                                 return service;
-                            }))
+                            })))
                     .whenComplete((cmgRaftService, e) -> {
                         if (e != null) {
                             LOG.warn("Error when handling the CMG Init", e);
@@ -561,13 +555,13 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             LOG.info("CMG leader has been elected, executing onLeaderElected 
callback");
 
             // The cluster state is broadcast via the messaging service; 
hence, the future must be completed here on the leader node.
-            raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock, 
() -> {
+            raftServiceAfterJoin().thenAccept(service -> inBusyLock(() -> {
                 service.readClusterState()
                         .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
 
                 updateLogicalTopology(service)
-                        .thenCompose(v -> inBusyLock(busyLock, () -> 
service.updateLearners(term)))
-                        .thenAccept(v -> inBusyLock(busyLock, () -> {
+                        .thenCompose(v -> inBusyLock(() -> 
service.updateLearners(term)))
+                        .thenAccept(v -> inBusyLock(() -> {
                             // Register a listener to send ClusterState 
messages to new nodes.
                             TopologyService topologyService = 
clusterService.topologyService();
 
@@ -601,6 +595,13 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         }
     }
 
+    private InitErrorMessage initErrorMessage(Throwable finalEx) {
+        return msgFactory.initErrorMessage()
+                .cause(finalEx.getMessage())
+                .shouldCancel(!(finalEx instanceof 
IllegalInitArgumentException))
+                .build();
+    }
+
     /**
      * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
      * physical topology during the election. Newly appeared nodes will be 
added automatically after the new leader broadcasts the current
@@ -608,7 +609,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      */
     private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
         return service.logicalTopology()
-                .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
+                .thenCompose(logicalTopology -> inBusyLock(() -> {
                     Set<UUID> physicalTopologyIds = 
clusterService.topologyService().allMembers()
                             .stream()
                             .map(ClusterNode::id)
@@ -631,7 +632,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     /** Delegates call to {@link #destroyCmg()} but fires the associated 
events. */
     private CompletableFuture<Void> destroyCmgWithEvents() {
         LOG.info("CMG destruction procedure started");
-        return inBusyLockAsync(busyLock,
+        return inBusyLockAsync(
                 () -> 
fireEvent(ClusterManagerGroupEvent.BEFORE_DESTROY_RAFT_GROUP, 
EmptyEventParameters.INSTANCE)
                         .thenRunAsync(this::destroyCmg, this.scheduledExecutor)
                         .exceptionally(err -> {
@@ -685,7 +686,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
                 // case we need to re-create the service.
                 raftService = raftService
-                        .handle((service, e) -> {
+                        .handle((service, e) -> inBusyLockAsync(() -> {
                             if (service != null && 
service.nodeNames().equals(state.cmgNodes())) {
                                 LOG.info("ClusterStateMessage received, but 
the CMG service is already started");
 
@@ -696,17 +697,15 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                                 // Service could not be started for some 
reason, which might be due to starting on incorrect CMG nodes
                                 assert e != null;
 
-                                if (e instanceof CompletionException) {
-                                    e = e.getCause();
-                                }
+                                Throwable finalEx = e instanceof 
CompletionException ? e.getCause() : e;
 
                                 // Nothing can be done if the node has not 
passed validation.
-                                if (e instanceof JoinDeniedException) {
-                                    return 
CompletableFuture.<CmgRaftService>failedFuture(e);
+                                if (finalEx instanceof JoinDeniedException) {
+                                    return failedFuture(finalEx);
                                 }
 
                                 LOG.warn("CMG service could not be started on 
previous attempts. "
-                                        + "Re-creating the CMG Raft service 
[reason={}]", e, e.getMessage());
+                                        + "Re-creating the CMG Raft service 
[reason={}]", finalEx, finalEx.getMessage());
 
                                 return initCmgRaftService(state);
                             } else {
@@ -717,7 +716,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                                 return destroyCmgWithEvents()
                                         .thenCompose(none -> 
initCmgRaftService(state));
                             }
-                        })
+                        }))
                         .thenCompose(Function.identity());
             }
         }
@@ -747,7 +746,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     private CompletableFuture<CmgRaftService> 
startCmgRaftServiceWithEvents(Set<String> nodeNames, @Nullable String 
initialClusterConfig) {
         BeforeStartRaftGroupEventParameters params = new 
BeforeStartRaftGroupEventParameters(nodeNames, initialClusterConfig);
         return fireEvent(ClusterManagerGroupEvent.BEFORE_START_RAFT_GROUP, 
params)
-                .thenComposeAsync(v -> startCmgRaftService(nodeNames), 
scheduledExecutor)
+                .thenComposeAsync(v -> inBusyLockAsync(() -> 
startCmgRaftService(nodeNames)), scheduledExecutor)
                 .whenComplete((v, e) -> {
                     if (e != null) {
                         LOG.warn("Error when initializing the CMG", e);
@@ -823,13 +822,13 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
      */
     private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
         return startCmgRaftServiceWithEvents(state.cmgNodes(), 
state.initialClusterConfiguration())
-                .thenCompose(service -> {
+                .thenCompose(service -> inBusyLockAsync(() -> {
                     var localState = new LocalState(state.cmgNodes(), 
state.clusterTag());
 
                     localStateStorage.saveLocalState(localState);
 
                     return validateAgainstCluster(service, state.clusterTag());
-                });
+                }));
     }
 
     private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
@@ -1138,6 +1137,18 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 });
     }
 
+    private void inBusyLock(Runnable action) {
+        IgniteUtils.inBusyLock(busyLock, action);
+    }
+
+    private <T> T inBusyLock(Supplier<T> action) {
+        return IgniteUtils.inBusyLock(busyLock, action);
+    }
+
+    private <T> CompletableFuture<T> 
inBusyLockAsync(Supplier<CompletableFuture<T>> action) {
+        return IgniteUtils.inBusyLockAsync(busyLock, action);
+    }
+
     @TestOnly
     LogicalTopologyImpl logicalTopologyImpl() {
         return (LogicalTopologyImpl) logicalTopology;

Reply via email to