This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6fcadcb933 IGNITE-22241 Make ClusterManagementGroupManager extensible
(#3756)
6fcadcb933 is described below
commit 6fcadcb933c6cffd35b2d43f197c4a6f16a30b85
Author: Tiago Marques Godinho <[email protected]>
AuthorDate: Fri May 24 07:15:41 2024 +0100
IGNITE-22241 Make ClusterManagementGroupManager extensible (#3756)
---
modules/cluster-management/build.gradle | 2 +
.../management/ClusterManagementGroupManager.java | 64 ++++++++++++++++++----
.../BeforeStartRaftGroupEventParameters.java | 51 +++++++++++++++++
.../events/ClusterManagerGroupEvent.java | 32 +++++++++++
.../management/events/EmptyEventParameters.java | 29 ++++++++++
.../internal/cluster/management/MockNode.java | 7 ++-
.../ItMetaStorageMultipleNodesAbstractTest.java | 10 +++-
.../metastorage/impl/ItMetaStorageWatchTest.java | 7 ++-
.../ItDistributedConfigurationPropertiesTest.java | 11 +++-
.../ItDistributedConfigurationStorageTest.java | 12 +++-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../rebalance/ItRebalanceDistributedTest.java | 6 +-
13 files changed, 214 insertions(+), 23 deletions(-)
diff --git a/modules/cluster-management/build.gradle
b/modules/cluster-management/build.gradle
index e412100d46..ceb8530dfb 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-configuration')
implementation project(':ignite-configuration-api')
implementation project(':ignite-configuration-presentation')
+ implementation project(':ignite-failure-handler')
implementation project(':ignite-network')
implementation project(':ignite-raft-api')
implementation project(':ignite-vault')
@@ -53,6 +54,7 @@ dependencies {
testImplementation libs.mockito.junit
testFixturesImplementation project(':ignite-core')
+ testFixturesImplementation project(':ignite-failure-handler')
testFixturesImplementation project(':ignite-raft')
testFixturesImplementation project(':ignite-raft-api')
testFixturesImplementation project(':ignite-storage-api')
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 6efd1feea7..3b4f951417 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -23,10 +23,12 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.util.Collection;
import java.util.List;
@@ -41,6 +43,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
+import
org.apache.ignite.internal.cluster.management.events.BeforeStartRaftGroupEventParameters;
+import
org.apache.ignite.internal.cluster.management.events.ClusterManagerGroupEvent;
+import
org.apache.ignite.internal.cluster.management.events.EmptyEventParameters;
import
org.apache.ignite.internal.cluster.management.network.CmgMessageHandlerFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage;
import
org.apache.ignite.internal.cluster.management.network.messages.ClusterStateMessage;
@@ -56,6 +61,10 @@ import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -85,7 +94,8 @@ import org.jetbrains.annotations.TestOnly;
* <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">IEP-77</a>
* for the description of the Cluster Management Group and its
responsibilities.
*/
-public class ClusterManagementGroupManager implements IgniteComponent {
+public class ClusterManagementGroupManager extends
AbstractEventProducer<ClusterManagerGroupEvent, EventParameters>
+ implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(ClusterManagementGroupManager.class);
/** Busy lock to stop synchronously. */
@@ -134,6 +144,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
/** Future that resolves into the initial cluster configuration in HOCON
format. */
private final CompletableFuture<String> initialClusterConfigurationFuture
= new CompletableFuture<>();
+ /** Failure processor that is used to handle critical errors. */
+ private final FailureProcessor failureProcessor;
+
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
@@ -143,7 +156,8 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
ClusterStateStorage clusterStateStorage,
LogicalTopology logicalTopology,
ClusterManagementConfiguration configuration,
- NodeAttributes nodeAttributes
+ NodeAttributes nodeAttributes,
+ FailureProcessor failureProcessor
) {
this.clusterService = clusterService;
this.clusterInitializer = clusterInitializer;
@@ -153,6 +167,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
this.configuration = configuration;
this.localStateStorage = new LocalStateStorage(vault);
this.nodeAttributes = nodeAttributes;
+ this.failureProcessor = failureProcessor;
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(clusterService.nodeName(),
"cmg-manager", LOG)
@@ -257,7 +272,8 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
LOG.info("Local CMG state recovered, starting the CMG");
- return startCmgRaftService(localState.cmgNodeNames())
+ // Since we recovered state we do not supply a new
initialClusterConfig.
+ return startCmgRaftServiceWithEvents(localState.cmgNodeNames(), null)
.thenCompose(service -> joinCluster(service,
localState.clusterTag()));
}
@@ -285,7 +301,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
// Raft service has not been started
LOG.info("Init command received, starting the CMG [nodes={}]",
msg.cmgNodes());
- serviceFuture = startCmgRaftService(msg.cmgNodes());
+ serviceFuture = startCmgRaftServiceWithEvents(msg.cmgNodes(),
msg.initialClusterConfiguration());
} else {
// Raft service has been started, which means that this node
has already received an init command at least once.
LOG.info("Init command received, but the CMG has already been
started");
@@ -426,8 +442,20 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
private void handleCancelInit(CancelInitMessage msg) {
LOG.info("CMG initialization cancelled [reason={}]", msg.reason());
-
- destroyCmg();
+ this.scheduledExecutor.execute(this::destroyCmgWithEvents);
+ }
+
+ /** Delegates call to {@link #destroyCmg()} but fires the associated
events. */
+ private CompletableFuture<Void> destroyCmgWithEvents() {
+ LOG.info("CMG cancellation procedure started");
+ return inBusyLockAsync(busyLock,
+ () ->
fireEvent(ClusterManagerGroupEvent.BEFORE_DESTROY_RAFT_GROUP,
EmptyEventParameters.INSTANCE)
+ .thenRunAsync(this::destroyCmg, this.scheduledExecutor)
+ .exceptionally(err -> {
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, err));
+ throw (err instanceof RuntimeException) ?
(RuntimeException) err : new CompletionException(err);
+ })
+ );
}
/**
@@ -493,15 +521,16 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
LOG.warn("CMG service could not be started on
previous attempts. "
+ "Re-creating the CMG Raft service
[reason={}]", e, e.getMessage());
+
+ return initCmgRaftService(state);
} else {
LOG.warn("CMG service started, but the cluster
state is different. "
+ "Re-creating the CMG Raft
service [localState={}, clusterState={}]",
service.nodeNames(), state.cmgNodes());
- destroyCmg();
+ return destroyCmgWithEvents()
+ .thenCompose(none ->
initCmgRaftService(state));
}
-
- return initCmgRaftService(state);
})
.thenCompose(Function.identity());
}
@@ -522,6 +551,19 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
});
}
+ /**
+ * Delegates call to {@link #startCmgRaftService(Set)} but fires the
associated events.
+ *
+ * @param initialClusterConfig the initial cluster configuration provided
by the
+ * {@link CmgInitMessage#initialClusterConfiguration()} if the
cluster is being initialized for the first time, as part of a
+ * cluster init. Otherwise {@code null}, if starting after
recovering state of an already initialized cluster.
+ */
+ private CompletableFuture<CmgRaftService>
startCmgRaftServiceWithEvents(Set<String> nodeNames, @Nullable String
initialClusterConfig) {
+ BeforeStartRaftGroupEventParameters params = new
BeforeStartRaftGroupEventParameters(nodeNames, initialClusterConfig);
+ return fireEvent(ClusterManagerGroupEvent.BEFORE_START_RAFT_GROUP,
params)
+ .thenCompose(v -> startCmgRaftService(nodeNames));
+ }
+
/**
* Starts the CMG Raft service using the provided node names as its peers.
*/
@@ -578,7 +620,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
* Starts the CMG Raft service using the given {@code state} and persists
it to the local storage.
*/
private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
- return startCmgRaftService(state.cmgNodes())
+ return startCmgRaftServiceWithEvents(state.cmgNodes(),
state.initialClusterConfiguration())
.thenCompose(service -> {
var localState = new LocalState(state.cmgNodes(),
state.clusterTag());
@@ -700,7 +742,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
initialClusterConfigurationFuture.completeExceptionally(new
NodeStoppingException());
- return nullCompletedFuture();
+ return fireEvent(ClusterManagerGroupEvent.AFTER_STOP_RAFT_GROUP,
EmptyEventParameters.INSTANCE);
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java
new file mode 100644
index 0000000000..70c8df74c7
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/BeforeStartRaftGroupEventParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.events;
+
+import java.util.Set;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
+import org.apache.ignite.internal.event.EventParameters;
+import org.jetbrains.annotations.Nullable;
+
+/** Transparent data container for the {@link
ClusterManagerGroupEvent#BEFORE_START_RAFT_GROUP}. */
+public class BeforeStartRaftGroupEventParameters implements EventParameters {
+ private final Set<String> nodeNames;
+ private final @Nullable String initialClusterConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeNames The names of the nodes in the cluster. This set is
copied internally.
+ * @param initialClusterConfig the initial cluster configuration provided
by the {@link CmgInitMessage#initialClusterConfiguration()},
+ * if the cluster is being initialized for the first time, as part of
a cluster init. Otherwise {@code null}, if starting after
+ * recovering state of an already initialized cluster.
+ */
+ public BeforeStartRaftGroupEventParameters(Set<String> nodeNames,
@Nullable String initialClusterConfig) {
+ this.nodeNames = Set.copyOf(nodeNames);
+ this.initialClusterConfig = initialClusterConfig;
+ }
+
+ public Set<String> nodeNames() {
+ return nodeNames;
+ }
+
+ @Nullable
+ public String initialClusterConfig() {
+ return initialClusterConfig;
+ }
+}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java
new file mode 100644
index 0000000000..314404b0d8
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/ClusterManagerGroupEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.events;
+
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.event.Event;
+
+/** Enum with events for the {@link ClusterManagementGroupManager}. */
+
+public enum ClusterManagerGroupEvent implements Event {
+ /** Fired before starting the cmg raft group. */
+ BEFORE_START_RAFT_GROUP,
+ /** Fired before destroying the cmg raft group and cleaning the local
state. */
+ BEFORE_DESTROY_RAFT_GROUP,
+ /** Fired after stopping the cmg raft group. */
+ AFTER_STOP_RAFT_GROUP
+}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java
new file mode 100644
index 0000000000..53de4140e0
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/events/EmptyEventParameters.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.events;
+
+import org.apache.ignite.internal.event.EventParameters;
+
+/** Empty Event Parameters Singleton. */
+public class EmptyEventParameters implements EventParameters {
+ public static final EmptyEventParameters INSTANCE = new
EmptyEventParameters();
+
+ private EmptyEventParameters() {
+ // Intentionally left blank.
+ }
+}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 1a1cf3420e..094cfe022d 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImp
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
@@ -101,6 +102,8 @@ public class MockNode {
var clusterStateStorage = new
RocksDbClusterStateStorage(this.workDir.resolve("cmg"),
clusterService.nodeName());
+ FailureProcessor failureProcessor = new FailureProcessor(nodeName);
+
this.clusterManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -109,7 +112,8 @@ public class MockNode {
clusterStateStorage,
new LogicalTopologyImpl(clusterStateStorage),
cmgConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageProfilesConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageProfilesConfiguration),
+ failureProcessor
);
components = List.of(
@@ -117,6 +121,7 @@ public class MockNode {
clusterService,
raftManager,
clusterStateStorage,
+ failureProcessor,
clusterManager
);
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 557ad690f1..9ff9059bd6 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -58,6 +58,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -138,6 +139,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
/** The future have to be complete after the node start and all Meta
storage watches are deployd. */
private final CompletableFuture<Void> deployWatchesFut;
+ private final FailureProcessor failureProcessor;
+
Node(ClusterService clusterService, Path dataPath) {
this.clusterService = clusterService;
@@ -166,6 +169,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
new TestConfigurationValidator()
);
+ this.failureProcessor = new FailureProcessor(name());
+
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -174,7 +179,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
clusterStateStorage,
logicalTopology,
cmgConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ failureProcessor
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
@@ -207,6 +213,7 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
clusterService,
raftManager,
clusterStateStorage,
+ failureProcessor,
cmgManager,
metaStorageManager
);
@@ -229,6 +236,7 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
List<IgniteComponent> components = List.of(
metaStorageManager,
cmgManager,
+ failureProcessor,
raftManager,
clusterStateStorage,
clusterService,
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index a87ebe6e13..ae241f68e4 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -59,6 +59,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -156,6 +157,9 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
new TestConfigurationValidator()
);
+ FailureProcessor failureProcessor = new FailureProcessor(name());
+ components.add(failureProcessor);
+
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -164,7 +168,8 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
clusterStateStorage,
logicalTopology,
cmgConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ failureProcessor
);
components.add(cmgManager);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 4ef7ae492c..10921b1a1b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -53,6 +53,7 @@ import
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -131,6 +132,8 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
/** The future have to be complete after the node start and all Meta
storage watches are deployd. */
private final CompletableFuture<Void> deployWatchesFut;
+ private final FailureProcessor failureProcessor;
+
/** Flag that disables storage updates. */
private volatile boolean receivesUpdates = true;
@@ -173,6 +176,8 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
new TestConfigurationValidator()
);
+ this.failureProcessor = new
FailureProcessor(clusterService.nodeName());
+
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -181,7 +186,8 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ failureProcessor
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
@@ -236,7 +242,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
*/
CompletableFuture<Void> start() {
assertThat(
- startAsync(vaultManager, clusterService, raftManager,
cmgManager, metaStorageManager),
+ startAsync(vaultManager, clusterService, raftManager,
failureProcessor, cmgManager, metaStorageManager),
willCompleteSuccessfully()
);
@@ -257,6 +263,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
var components = List.of(
distributedCfgManager,
cmgManager,
+ failureProcessor,
metaStorageManager,
raftManager,
clusterService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 17cf8b3f03..791692cefa 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -109,6 +110,8 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
/** The future have to be complete after the node start and all Meta
storage watches are deployd. */
private final CompletableFuture<Void> deployWatchesFut;
+ private final FailureProcessor failureProcessor;
+
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -145,6 +148,8 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
new TestConfigurationValidator()
);
+ this.failureProcessor = new
FailureProcessor(clusterService.nodeName());
+
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -153,7 +158,8 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ failureProcessor
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
@@ -187,7 +193,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
*/
void start() {
assertThat(
- startAsync(vaultManager, clusterService, raftManager,
cmgManager, metaStorageManager),
+ startAsync(vaultManager, clusterService, raftManager,
failureProcessor, cmgManager, metaStorageManager),
willCompleteSuccessfully()
);
@@ -207,7 +213,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
*/
void stop() {
var components =
- List.of(metaStorageManager, cmgManager, raftManager,
clusterService, vaultManager);
+ List.of(metaStorageManager, cmgManager, failureProcessor,
raftManager, clusterService, vaultManager);
for (IgniteComponent igniteComponent : components) {
igniteComponent.beforeNodeStop();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index d33ff17a4c..ac876f0bd5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -372,7 +372,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
-
nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY))
+
nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY)),
+ failureProcessor
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 40f6d8194a..0e9c1fcf30 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -550,7 +550,8 @@ public class IgniteImpl implements Ignite {
clusterStateStorage,
logicalTopology,
nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY),
- nodeAttributesCollector
+ nodeAttributesCollector,
+ failureProcessor
);
logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgMgr);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 8e963c6e34..ac7e294e7c 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1056,6 +1056,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
new TestConfigurationValidator()
);
+ failureProcessor = new FailureProcessor(name);
+
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -1064,7 +1066,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration)
+ new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ failureProcessor
);
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
@@ -1146,7 +1149,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Path storagePath = dir.resolve("storage");
- failureProcessor = new FailureProcessor(name);
dataStorageMgr = new DataStorageManager(
dataStorageModules.createStorageEngines(