This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 102ae3fd26 HDDS-8660. Notify ReplicationManager when nodes go dead or
out of service (#7997)
102ae3fd26 is described below
commit 102ae3fd26ea94e484cd530711236d7fe2cb449d
Author: Peter Lee <[email protected]>
AuthorDate: Wed Apr 9 17:31:47 2025 +0800
HDDS-8660. Notify ReplicationManager when nodes go dead or out of service
(#7997)
---
.../container/replication/ReplicationManager.java | 42 +++-
.../ReplicationManagerEventHandler.java | 51 ++++
.../container/replication/ReplicationQueue.java | 4 +
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 4 +
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 17 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 27 ++-
.../hdds/scm/server/StorageContainerManager.java | 6 +
.../replication/TestReplicationManager.java | 60 +++++
.../TestReplicationManagerEventHandler.java | 72 ++++++
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 10 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 61 +++++
.../TestReplicationManagerIntegration.java | 263 +++++++++++++++++++++
12 files changed, 609 insertions(+), 8 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 09245b2bee..89c2d16d75 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -115,7 +115,7 @@ public class ReplicationManager implements SCMService,
ContainerReplicaPendingOp
/**
* SCMContext from StorageContainerManager.
*/
- private final SCMContext scmContext;
+ private SCMContext scmContext;
/**
@@ -918,6 +918,10 @@ public ReplicationManagerReport getContainerReport() {
return containerReport;
}
+ public boolean isThreadWaiting() {
+ return replicationMonitor.getState() == Thread.State.TIMED_WAITING;
+ }
+
/**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
@@ -1410,6 +1414,11 @@ public boolean shouldRun() {
}
}
+ @VisibleForTesting
+ public void setScmContext(SCMContext context) {
+ scmContext = context;
+ }
+
@Override
public String getServiceName() {
return ReplicationManager.class.getSimpleName();
@@ -1492,5 +1501,36 @@ public boolean hasHealthyPipeline(ContainerInfo
container) {
return false;
}
}
+
+ /**
+ * Notify the ReplicationManager that a node state has changed, which might
+ * require container replication. This will wake up the replication monitor
+ * thread if it's sleeping and there's no active replication work in
progress.
+ *
+ * @return true if the replication monitor was woken up, false otherwise
+ */
+ public synchronized boolean notifyNodeStateChange() {
+ if (!running || serviceStatus == ServiceStatus.PAUSING) {
+ return false;
+ }
+
+ if (!isThreadWaiting()) {
+ LOG.debug("Replication monitor is running, not need to wake it up");
+ return false;
+ }
+
+ // Only wake up the thread if there's no active replication work
+ // This prevents creating a new replication queue over and over
+ // when multiple nodes change state in quick succession
+ if (getQueue().isEmpty()) {
+ LOG.debug("Waking up replication monitor due to node state change");
+ // Notify the replication monitor thread to wake up
+ notify();
+ return true;
+ } else {
+ LOG.debug("Replication queue is not empty, not waking up replication
monitor");
+ return false;
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
new file mode 100644
index 0000000000..c63c44596d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles events related to the ReplicationManager.
+ */
+public class ReplicationManagerEventHandler implements
EventHandler<DatanodeDetails> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManagerEventHandler.class);
+
+ private final ReplicationManager replicationManager;
+ private final SCMContext scmContext;
+
+ public ReplicationManagerEventHandler(ReplicationManager replicationManager,
SCMContext scmContext) {
+ this.replicationManager = replicationManager;
+ this.scmContext = scmContext;
+ }
+
+ @Override
+ public void onMessage(DatanodeDetails datanodeDetails, EventPublisher
eventPublisher) {
+ if (!scmContext.isLeaderReady() || scmContext.isInSafeMode()) {
+ // same condition in ReplicationManager
+ return;
+ }
+ LOG.debug("ReplicationManagerEventHandler received event for datanode:
{}", datanodeDetails);
+ replicationManager.notifyNodeStateChange();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
index 9e17d67479..ee869515ce 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -97,4 +97,8 @@ public int overReplicatedQueueSize() {
return overRepQueue.size();
}
+ public boolean isEmpty() {
+ return underRepQueue.isEmpty() && overRepQueue.isEmpty();
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index b72f786acd..797a6dfd61 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -212,6 +212,10 @@ public final class SCMEvents {
new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class,
"Delete_Block_Status");
+ public static final TypedEvent<DatanodeDetails>
+ REPLICATION_MANAGER_NOTIFY =
+ new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify");
+
/**
* Private Ctor. Never Constructed.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 69de282e81..9f69d9456d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -91,12 +92,21 @@ public void onMessage(final DatanodeDetails datanodeDetails,
closeContainers(datanodeDetails, publisher);
destroyPipelines(datanodeDetails);
+ boolean isNodeInMaintenance =
nodeManager.getNodeStatus(datanodeDetails).isInMaintenance();
+
// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
- if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+ if (!isNodeInMaintenance) {
removeContainerReplicas(datanodeDetails);
}
-
+
+ // Notify ReplicationManager
+ if (!isNodeInMaintenance) {
+ LOG.debug("Notifying ReplicationManager about dead node: {}",
+ datanodeDetails);
+ publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY,
datanodeDetails);
+ }
+
// remove commands in command queue for the DN
final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
datanodeDetails.getUuid());
@@ -105,8 +115,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
// remove DeleteBlocksCommand associated with the dead node unless it
// is IN_MAINTENANCE
- if (deletedBlockLog != null &&
- !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+ if (deletedBlockLog != null && !isNodeInMaintenance) {
deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 5d72d7428f..f950e8719e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -636,9 +636,34 @@ protected void updateDatanodeOpState(DatanodeDetails
reportedDn)
}
}
DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
+ NodeOperationalState oldPersistedOpState = scmDnd.getPersistedOpState();
+ NodeOperationalState newPersistedOpState =
reportedDn.getPersistedOpState();
+
scmDnd.setPersistedOpStateExpiryEpochSec(
reportedDn.getPersistedOpStateExpiryEpochSec());
- scmDnd.setPersistedOpState(reportedDn.getPersistedOpState());
+ scmDnd.setPersistedOpState(newPersistedOpState);
+
+ maybeNotifyReplicationManager(reportedDn, oldPersistedOpState,
newPersistedOpState);
+ }
+
+ private void maybeNotifyReplicationManager(
+ DatanodeDetails datanode,
+ NodeOperationalState oldState,
+ NodeOperationalState newState) {
+ if (!scmContext.isLeader()) {
+ return;
+ }
+
+ if (oldState != newState) {
+ // Notify when a node is entering maintenance, decommissioning or back
to service
+ if (newState == NodeOperationalState.ENTERING_MAINTENANCE
+ || newState == NodeOperationalState.DECOMMISSIONING
+ || newState == NodeOperationalState.IN_SERVICE) {
+ LOG.info("Notifying ReplicationManager of node state change for {}: {}
-> {}",
+ datanode, oldState, newState);
+ scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY,
datanode);
+ }
+ }
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3d1807a819..1e1eb4cbe2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -95,6 +95,7 @@
import
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import
org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerEventHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -495,11 +496,16 @@ private void initializeEventHandlers() {
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, scmContext);
+ ReplicationManagerEventHandler replicationManagerEventHandler =
+ new ReplicationManagerEventHandler(replicationManager, scmContext);
+
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND,
scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED,
new DatanodeCommandCountUpdatedHandler(replicationManager));
+ eventQueue.addHandler(SCMEvents.REPLICATION_MANAGER_NOTIFY,
+ replicationManagerEventHandler);
// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 7555e1ab88..935d371bd0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -61,6 +61,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -97,6 +99,7 @@
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Lists;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.AfterEach;
@@ -104,6 +107,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
/**
@@ -1648,6 +1652,62 @@ public void testPendingOpExpiry() throws
ContainerNotFoundException {
assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty)
+ throws IOException, InterruptedException, ReflectiveOperationException,
TimeoutException {
+
+ AtomicBoolean processAllCalled = new AtomicBoolean(false);
+ ReplicationQueue queue = mock(ReplicationQueue.class);
+ when(queue.isEmpty()).thenReturn(queueIsEmpty);
+ final ReplicationManager customRM = new ReplicationManager(
+ configuration,
+ containerManager,
+ ratisPlacementPolicy,
+ ecPlacementPolicy,
+ eventPublisher,
+ scmContext,
+ nodeManager,
+ clock,
+ containerReplicaPendingOps) {
+ @Override
+ public ReplicationQueue getQueue() {
+ return queue;
+ }
+
+ @Override
+ public synchronized void processAll() {
+ processAllCalled.set(true);
+ }
+ };
+
+ customRM.notifyStatusChanged();
+ customRM.start();
+
+ // wait for the thread become TIMED_WAITING
+ GenericTestUtils.waitFor(
+ () -> customRM.isThreadWaiting(),
+ 100,
+ 1000);
+
+ // The processAll method will be called when the ReplicationManager's run
+ // method is executed by the replicationMonitor thread.
+ assertTrue(processAllCalled.get());
+ processAllCalled.set(false);
+
+ assertThat(customRM.notifyNodeStateChange()).isEqualTo(queueIsEmpty);
+
+ GenericTestUtils.waitFor(
+ () -> customRM.isThreadWaiting(),
+ 100,
+ 1000);
+
+ // If the queue is empty, the processAll method should have been called
+ assertEquals(processAllCalled.get(), queueIsEmpty);
+
+ customRM.stop();
+ }
+
@SafeVarargs
private final Set<ContainerReplica> addReplicas(ContainerInfo container,
ContainerReplicaProto.State replicaState,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
new file mode 100644
index 0000000000..480c873cbd
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test the ReplicationManagerEventHandler class.
+ */
+public class TestReplicationManagerEventHandler {
+ private ReplicationManager replicationManager;
+ private ReplicationManagerEventHandler replicationManagerEventHandler;
+ private EventPublisher publisher;
+ private SCMContext scmContext;
+
+ @BeforeEach
+ public void setUp() {
+ replicationManager = mock(ReplicationManager.class);
+ publisher = mock(EventPublisher.class);
+ scmContext = mock(SCMContext.class);
+ replicationManagerEventHandler = new
ReplicationManagerEventHandler(replicationManager, scmContext);
+ }
+
+ private static Stream<Arguments> testData() {
+ return Stream.of(
+ Arguments.of(true, false, true),
+ Arguments.of(false, true, false),
+ Arguments.of(true, true, false),
+ Arguments.of(false, false, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testData")
+ public void testReplicationManagerEventHandler(boolean isLeaderReady,
boolean isInSafeMode,
+ boolean isExpectedToNotify) {
+ when(scmContext.isLeaderReady()).thenReturn(isLeaderReady);
+ when(scmContext.isInSafeMode()).thenReturn(isInSafeMode);
+ DatanodeDetails dataNodeDetails =
MockDatanodeDetails.randomDatanodeDetails();
+ replicationManagerEventHandler.onMessage(dataNodeDetails, publisher);
+
+ verify(replicationManager, times(isExpectedToNotify ? 1 :
0)).notifyNodeStateChange();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index aedf64f926..674fe2b972 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -107,6 +108,7 @@ public void setup() throws IOException,
AuthenticationException {
pipelineManager =
(PipelineManagerImpl)scm.getPipelineManager();
pipelineManager.setScmContext(scmContext);
+ scm.getReplicationManager().setScmContext(scmContext);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -230,6 +232,10 @@ public void testOnMessage(@TempDir File tempDir) throws
Exception {
assertFalse(
nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
+ verify(publisher,
times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1);
+
+ clearInvocations(publisher);
+
verify(deletedBlockLog, times(0))
.onDatanodeDead(datanode1.getUuid());
@@ -259,8 +265,8 @@ public void testOnMessage(@TempDir File tempDir) throws
Exception {
nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(),
cmd.getType()));
- verify(deletedBlockLog, times(1))
- .onDatanodeDead(datanode1.getUuid());
+ verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY,
datanode1);
+ verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid());
container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index cb2315f7fd..89b1cbd3e1 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -86,6 +86,7 @@
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -2038,4 +2039,64 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
assertEquals(emptyList(), nodeManager.getNodesByAddress(ipAddress));
}
}
+
+ private static Stream<Arguments> nodeStateTransitions() {
+ return Stream.of(
+ // start decommissioning or entering maintenance
+ Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE,
+ HddsProtos.NodeOperationalState.DECOMMISSIONING, true),
+ Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE,
+ HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
true),
+ // back to service (DataNodeAdminMonitor abort workflow, maintenance
end time expired or node is dead)
+ Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ HddsProtos.NodeOperationalState.IN_SERVICE, true),
+ Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED,
+ HddsProtos.NodeOperationalState.IN_SERVICE, true),
+ Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+ HddsProtos.NodeOperationalState.IN_SERVICE, true),
+ Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE,
+ HddsProtos.NodeOperationalState.IN_SERVICE, true),
+ // there is no under/over replicated containers on the node, completed
the admin workflow
+ Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ HddsProtos.NodeOperationalState.DECOMMISSIONED, false),
+ Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+ HddsProtos.NodeOperationalState.IN_MAINTENANCE, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("nodeStateTransitions")
+ public void testNodeOperationalStateChange(
+ HddsProtos.NodeOperationalState oldState,
+ HddsProtos.NodeOperationalState newState,
+ boolean shouldNotify)
+ throws IOException, NodeNotFoundException, AuthenticationException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
+ when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
+ EventPublisher eventPublisher = mock(EventPublisher.class);
+ HDDSLayoutVersionManager lvm = new
HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion());
+ createNodeManager(getConf());
+ SCMNodeManager nodeManager = new SCMNodeManager(conf,
+ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
+ scmContext, lvm);
+
+ DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
+ datanode.setPersistedOpState(oldState);
+ nodeManager.register(datanode, null,
HddsTestUtils.getRandomPipelineReports());
+
+ nodeManager.setNodeOperationalState(datanode, newState, 0);
+ verify(eventPublisher,
times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode);
+
+ DatanodeDetails reportedDatanode =
MockDatanodeDetails.createDatanodeDetails(
+ datanode.getUuid());
+ reportedDatanode.setPersistedOpState(newState);
+
+ nodeManager.processHeartbeat(reportedDatanode);
+
+ verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent(
+ SCMEvents.REPLICATION_MANAGER_NOTIFY, reportedDatanode);
+
+ nodeManager.close();
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
new file mode 100644
index 0000000000..8556a01e53
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneKeyLocation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for ReplicationManager.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class TestReplicationManagerIntegration {
+ private static final int DATANODE_COUNT = 5;
+ private static final int HEALTHY_REPLICA_NUM = 3;
+ private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
RatisReplicationConfig
+ .getInstance(HddsProtos.ReplicationFactor.THREE);
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationManagerIntegration.class);
+
+ private MiniOzoneCluster cluster;
+ private NodeManager nodeManager;
+ private ContainerManager containerManager;
+ private ReplicationManager replicationManager;
+ private StorageContainerManager scm;
+ private OzoneClient client;
+ private ContainerOperationClient scmClient;
+ private OzoneBucket bucket;
+
+ @BeforeAll
+ void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+ 100, MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, MILLISECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100,
MILLISECONDS);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, MILLISECONDS);
+ conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 100, MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 2, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+ 1, SECONDS);
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+ 1, SECONDS);
+ conf.setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ 0, SECONDS);
+ conf.set(OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ conf.set(OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ conf.set(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
+
+ ReplicationManagerConfiguration replicationConf =
conf.getObject(ReplicationManagerConfiguration.class);
+ replicationConf.setInterval(Duration.ofSeconds(1));
+ replicationConf.setUnderReplicatedInterval(Duration.ofMillis(100));
+ replicationConf.setOverReplicatedInterval(Duration.ofMillis(100));
+ conf.setFromObject(replicationConf);
+
+ MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(DATANODE_COUNT);
+
+ cluster = builder.build();
+ cluster.getConf().setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, SECONDS);
+ cluster.waitForClusterToBeReady();
+
+ scm = cluster.getStorageContainerManager();
+ nodeManager = scm.getScmNodeManager();
+ containerManager = scm.getContainerManager();
+ replicationManager = scm.getReplicationManager();
+
+ client = cluster.newClient();
+ scmClient = new ContainerOperationClient(cluster.getConf());
+ bucket = TestDataUtil.createVolumeAndBucket(client);
+ }
+
+ @AfterAll
+ void shutdown() {
+ IOUtils.close(LOG, client, scmClient, cluster);
+ }
+
+ @Order(1)
+ @Test
+ void testReplicationManagerNotify() throws Exception {
+ // Test if RM notify works
+ replicationManager.getConfig().setInterval(Duration.ofSeconds(300));
+ GenericTestUtils.waitFor(() -> replicationManager.isThreadWaiting(), 200,
30000);
+ }
+
+ @Order(Integer.MAX_VALUE)
+ @Test
+ public void testClosedContainerReplicationWhenNodeDies()
+ throws Exception {
+ String keyName = "key-" + UUID.randomUUID();
+ TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG,
+ "this is the content".getBytes(StandardCharsets.UTF_8));
+
+ // Get the container ID for the key
+ OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+ List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
+ long containerID = keyLocations.get(0).getContainerID();
+ ContainerID containerId = ContainerID.valueOf(containerID);
+ // open container would not be handled to do any further processing in RM
+ OzoneTestUtils.closeContainer(scm,
containerManager.getContainer(containerId));
+
+ assertEquals(HEALTHY_REPLICA_NUM,
containerManager.getContainerReplicas(containerId).size());
+
+ final DatanodeDetails targetDatanode = findReplica(containerId);
+
+ cluster.shutdownHddsDatanode(targetDatanode);
+ waitForDnToReachHealthState(nodeManager, targetDatanode, DEAD);
+
+ // Check if the replicas nodes don't contain dead one
+ // and the replica of container replica num is considered to be healthy
+ GenericTestUtils.waitFor(() -> {
+ try {
+ Set<ContainerReplica> replicas =
containerManager.getContainerReplicas(containerId);
+ boolean deadNodeNotInContainerReplica = replicas.stream()
+ .noneMatch(r -> r.getDatanodeDetails().equals(targetDatanode));
+ boolean hasHealthyReplicaNum = replicas.size() == HEALTHY_REPLICA_NUM;
+ return deadNodeNotInContainerReplica && hasHealthyReplicaNum;
+ } catch (ContainerNotFoundException e) {
+ return false;
+ }
+ }, 100, 30000);
+ }
+
+ private DatanodeDetails findReplica(ContainerID containerId) throws
ContainerNotFoundException {
+ // Find a datanode that has a replica of this container
+ return containerManager.getContainerReplicas(containerId).stream()
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Replica not found for " +
containerId))
+ .getDatanodeDetails();
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = NodeOperationalState.class, names = {"IN_MAINTENANCE",
"DECOMMISSIONED"})
+ void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService(
+ NodeOperationalState expectedOpState)
+ throws Exception {
+
+ String keyName = "key-" + UUID.randomUUID();
+ TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG,
+ "this is the content".getBytes(StandardCharsets.UTF_8));
+
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ List<OzoneKeyLocation> keyLocations = key.getOzoneKeyLocations();
+
+ long containerID = keyLocations.get(0).getContainerID();
+ ContainerID containerId = ContainerID.valueOf(containerID);
+ ContainerInfo containerInfo = containerManager.getContainer(containerId);
+ OzoneTestUtils.closeContainer(scm, containerInfo);
+
+ assertEquals(containerManager.getContainerReplicas(containerId).size(),
HEALTHY_REPLICA_NUM);
+
+ DatanodeDetails datanode = findReplica(containerId);
+
+ if (expectedOpState == IN_MAINTENANCE) {
+
scmClient.startMaintenanceNodes(Collections.singletonList(getDNHostAndPort(datanode)),
0, false);
+ waitForDnToReachOpState(nodeManager, datanode, expectedOpState);
+ assertEquals(containerManager.getContainerReplicas(containerId).size(),
+ HEALTHY_REPLICA_NUM);
+ } else {
+
scmClient.decommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)),
false);
+ waitForDnToReachOpState(nodeManager, datanode, expectedOpState);
+ // decommissioning node would be excluded
+ assertEquals(containerManager.getContainerReplicas(containerId).size(),
+ HEALTHY_REPLICA_NUM + 1);
+ }
+
+ // bring the node back to service
+
scmClient.recommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)));
+
+ waitForDnToReachOpState(nodeManager, datanode, IN_SERVICE);
+ waitForDnToReachPersistedOpState(datanode, IN_SERVICE);
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return containerManager.getContainerReplicas(containerId).size() ==
HEALTHY_REPLICA_NUM;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 200, 30000);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]