This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 102ae3fd26 HDDS-8660. Notify ReplicationManager when nodes go dead or 
out of service (#7997)
102ae3fd26 is described below

commit 102ae3fd26ea94e484cd530711236d7fe2cb449d
Author: Peter Lee <[email protected]>
AuthorDate: Wed Apr 9 17:31:47 2025 +0800

    HDDS-8660. Notify ReplicationManager when nodes go dead or out of service 
(#7997)
---
 .../container/replication/ReplicationManager.java  |  42 +++-
 .../ReplicationManagerEventHandler.java            |  51 ++++
 .../container/replication/ReplicationQueue.java    |   4 +
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   4 +
 .../hadoop/hdds/scm/node/DeadNodeHandler.java      |  17 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  27 ++-
 .../hdds/scm/server/StorageContainerManager.java   |   6 +
 .../replication/TestReplicationManager.java        |  60 +++++
 .../TestReplicationManagerEventHandler.java        |  72 ++++++
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  10 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  61 +++++
 .../TestReplicationManagerIntegration.java         | 263 +++++++++++++++++++++
 12 files changed, 609 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 09245b2bee..89c2d16d75 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -115,7 +115,7 @@ public class ReplicationManager implements SCMService, 
ContainerReplicaPendingOp
   /**
    * SCMContext from StorageContainerManager.
    */
-  private final SCMContext scmContext;
+  private SCMContext scmContext;
 
 
   /**
@@ -918,6 +918,10 @@ public ReplicationManagerReport getContainerReport() {
     return containerReport;
   }
 
+  public boolean isThreadWaiting() {
+    return replicationMonitor.getState() == Thread.State.TIMED_WAITING;
+  }
+
   /**
    * ReplicationMonitor thread runnable. This wakes up at configured
    * interval and processes all the containers in the system.
@@ -1410,6 +1414,11 @@ public boolean shouldRun() {
     }
   }
 
+  @VisibleForTesting
+  public void setScmContext(SCMContext context) {
+    scmContext = context;
+  }
+
   @Override
   public String getServiceName() {
     return ReplicationManager.class.getSimpleName();
@@ -1492,5 +1501,36 @@ public boolean hasHealthyPipeline(ContainerInfo 
container) {
       return false;
     }
   }
+
+  /**
+   * Notify the ReplicationManager that a node state has changed, which might
+   * require container replication. This will wake up the replication monitor
+   * thread if it's sleeping and there's no active replication work in 
progress.
+   * 
+   * @return true if the replication monitor was woken up, false otherwise
+   */
+  public synchronized boolean notifyNodeStateChange() {
+    if (!running || serviceStatus == ServiceStatus.PAUSING) {
+      return false;
+    }
+
+    if (!isThreadWaiting()) {
+      LOG.debug("Replication monitor is running, not need to wake it up");
+      return false;
+    }
+
+    // Only wake up the thread if there's no active replication work
+    // This prevents creating a new replication queue over and over
+    // when multiple nodes change state in quick succession
+    if (getQueue().isEmpty()) {
+      LOG.debug("Waking up replication monitor due to node state change");
+      // Notify the replication monitor thread to wake up
+      notify();
+      return true;
+    } else {
+      LOG.debug("Replication queue is not empty, not waking up replication 
monitor");
+      return false;
+    }
+  }
 }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
new file mode 100644
index 0000000000..c63c44596d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles events related to the ReplicationManager.
+ */
+public class ReplicationManagerEventHandler implements 
EventHandler<DatanodeDetails> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationManagerEventHandler.class);
+
+  private final ReplicationManager replicationManager;
+  private final SCMContext scmContext;
+
+  public ReplicationManagerEventHandler(ReplicationManager replicationManager, 
SCMContext scmContext) {
+    this.replicationManager = replicationManager;
+    this.scmContext = scmContext;
+  }
+
+  @Override
+  public void onMessage(DatanodeDetails datanodeDetails, EventPublisher 
eventPublisher) {
+    if (!scmContext.isLeaderReady() || scmContext.isInSafeMode()) {
+      // same condition in ReplicationManager
+      return;
+    }
+    LOG.debug("ReplicationManagerEventHandler received event for datanode: 
{}", datanodeDetails);
+    replicationManager.notifyNodeStateChange();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
index 9e17d67479..ee869515ce 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -97,4 +97,8 @@ public int overReplicatedQueueSize() {
     return overRepQueue.size();
   }
 
+  public boolean isEmpty() {
+    return underRepQueue.isEmpty() && overRepQueue.isEmpty();
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index b72f786acd..797a6dfd61 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -212,6 +212,10 @@ public final class SCMEvents {
       new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class,
           "Delete_Block_Status");
 
+  public static final TypedEvent<DatanodeDetails>
+      REPLICATION_MANAGER_NOTIFY =
+      new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify");
+
   /**
    * Private Ctor. Never Constructed.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 69de282e81..9f69d9456d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -91,12 +92,21 @@ public void onMessage(final DatanodeDetails datanodeDetails,
       closeContainers(datanodeDetails, publisher);
       destroyPipelines(datanodeDetails);
 
+      boolean isNodeInMaintenance = 
nodeManager.getNodeStatus(datanodeDetails).isInMaintenance();
+
       // Remove the container replicas associated with the dead node unless it
       // is IN_MAINTENANCE
-      if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+      if (!isNodeInMaintenance) {
         removeContainerReplicas(datanodeDetails);
       }
-      
+
+      // Notify ReplicationManager
+      if (!isNodeInMaintenance) {
+        LOG.debug("Notifying ReplicationManager about dead node: {}",
+            datanodeDetails);
+        publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, 
datanodeDetails);
+      }
+
       // remove commands in command queue for the DN
       final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
           datanodeDetails.getUuid());
@@ -105,8 +115,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
 
       // remove DeleteBlocksCommand associated with the dead node unless it
       // is IN_MAINTENANCE
-      if (deletedBlockLog != null &&
-          !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
+      if (deletedBlockLog != null && !isNodeInMaintenance) {
         deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid());
       }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 5d72d7428f..f950e8719e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -636,9 +636,34 @@ protected void updateDatanodeOpState(DatanodeDetails 
reportedDn)
       }
     }
     DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
+    NodeOperationalState oldPersistedOpState = scmDnd.getPersistedOpState();
+    NodeOperationalState newPersistedOpState = 
reportedDn.getPersistedOpState();
+
     scmDnd.setPersistedOpStateExpiryEpochSec(
         reportedDn.getPersistedOpStateExpiryEpochSec());
-    scmDnd.setPersistedOpState(reportedDn.getPersistedOpState());
+    scmDnd.setPersistedOpState(newPersistedOpState);
+
+    maybeNotifyReplicationManager(reportedDn, oldPersistedOpState, 
newPersistedOpState);
+  }
+
+  private void maybeNotifyReplicationManager(
+      DatanodeDetails datanode,
+      NodeOperationalState oldState,
+      NodeOperationalState newState) {
+    if (!scmContext.isLeader()) {
+      return;
+    }
+
+    if (oldState != newState) {
+      // Notify when a node is entering maintenance, decommissioning or back 
to service
+      if (newState == NodeOperationalState.ENTERING_MAINTENANCE
+          || newState == NodeOperationalState.DECOMMISSIONING
+          || newState == NodeOperationalState.IN_SERVICE) {
+        LOG.info("Notifying ReplicationManager of node state change for {}: {} 
-> {}",
+            datanode, oldState, newState);
+        scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, 
datanode);
+      }
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3d1807a819..1e1eb4cbe2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -95,6 +95,7 @@
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import 
org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerEventHandler;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -495,11 +496,16 @@ private void initializeEventHandlers() {
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager, scmContext);
 
+    ReplicationManagerEventHandler replicationManagerEventHandler =
+        new ReplicationManagerEventHandler(replicationManager, scmContext);
+
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, 
scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED,
         new DatanodeCommandCountUpdatedHandler(replicationManager));
+    eventQueue.addHandler(SCMEvents.REPLICATION_MANAGER_NOTIFY,
+        replicationManagerEventHandler);
 
     // Use the same executor for both ICR and FCR.
     // The Executor maps the event to a thread for DN.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 7555e1ab88..935d371bd0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -61,6 +61,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -97,6 +99,7 @@
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Lists;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.TestClock;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.jupiter.api.AfterEach;
@@ -104,6 +107,7 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 
 /**
@@ -1648,6 +1652,62 @@ public void testPendingOpExpiry() throws 
ContainerNotFoundException {
     assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty) 
+      throws IOException, InterruptedException, ReflectiveOperationException, 
TimeoutException {
+
+    AtomicBoolean processAllCalled = new AtomicBoolean(false);
+    ReplicationQueue queue = mock(ReplicationQueue.class);
+    when(queue.isEmpty()).thenReturn(queueIsEmpty);
+    final ReplicationManager customRM = new ReplicationManager(
+        configuration,
+        containerManager,
+        ratisPlacementPolicy,
+        ecPlacementPolicy,
+        eventPublisher,
+        scmContext,
+        nodeManager,
+        clock,
+        containerReplicaPendingOps) {
+          @Override
+          public ReplicationQueue getQueue() {
+            return queue;
+          }
+
+          @Override
+          public synchronized void processAll() {
+            processAllCalled.set(true);
+          }
+        };
+
+    customRM.notifyStatusChanged();
+    customRM.start();
+
+    // wait for the thread become TIMED_WAITING
+    GenericTestUtils.waitFor(
+        () -> customRM.isThreadWaiting(),
+        100,
+        1000);
+
+    // The processAll method will be called when the ReplicationManager's run
+    // method is executed by the replicationMonitor thread.
+    assertTrue(processAllCalled.get());
+    processAllCalled.set(false);
+
+    assertThat(customRM.notifyNodeStateChange()).isEqualTo(queueIsEmpty);
+
+    GenericTestUtils.waitFor(
+        () -> customRM.isThreadWaiting(),
+        100,
+        1000);
+
+    // If the queue is empty, the processAll method should have been called
+    assertEquals(processAllCalled.get(), queueIsEmpty);
+
+    customRM.stop();
+  }
+
   @SafeVarargs
   private final Set<ContainerReplica>  addReplicas(ContainerInfo container,
       ContainerReplicaProto.State replicaState,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
new file mode 100644
index 0000000000..480c873cbd
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test the ReplicationManagerEventHandler class.
+ */
+public class TestReplicationManagerEventHandler {
+  private ReplicationManager replicationManager;
+  private ReplicationManagerEventHandler replicationManagerEventHandler;
+  private EventPublisher publisher;
+  private SCMContext scmContext;
+
+  @BeforeEach
+  public void setUp() {
+    replicationManager = mock(ReplicationManager.class);
+    publisher = mock(EventPublisher.class);
+    scmContext = mock(SCMContext.class);
+    replicationManagerEventHandler = new 
ReplicationManagerEventHandler(replicationManager, scmContext);
+  }
+
+  private static Stream<Arguments> testData() {
+    return Stream.of(
+      Arguments.of(true, false, true),
+      Arguments.of(false, true, false),
+      Arguments.of(true, true, false),
+      Arguments.of(false, false, false)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("testData")
+  public void testReplicationManagerEventHandler(boolean isLeaderReady, 
boolean isInSafeMode,
+      boolean isExpectedToNotify) {
+    when(scmContext.isLeaderReady()).thenReturn(isLeaderReady);
+    when(scmContext.isInSafeMode()).thenReturn(isInSafeMode);
+    DatanodeDetails dataNodeDetails = 
MockDatanodeDetails.randomDatanodeDetails();
+    replicationManagerEventHandler.onMessage(dataNodeDetails, publisher);
+
+    verify(replicationManager, times(isExpectedToNotify ? 1 : 
0)).notifyNodeStateChange();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index aedf64f926..674fe2b972 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -23,6 +23,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -107,6 +108,7 @@ public void setup() throws IOException, 
AuthenticationException {
     pipelineManager =
         (PipelineManagerImpl)scm.getPipelineManager();
     pipelineManager.setScmContext(scmContext);
+    scm.getReplicationManager().setScmContext(scmContext);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -230,6 +232,10 @@ public void testOnMessage(@TempDir File tempDir) throws 
Exception {
     assertFalse(
         nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
 
+    verify(publisher, 
times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1);
+
+    clearInvocations(publisher);
+
     verify(deletedBlockLog, times(0))
         .onDatanodeDead(datanode1.getUuid());
 
@@ -259,8 +265,8 @@ public void testOnMessage(@TempDir File tempDir) throws 
Exception {
         nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
     assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(), 
cmd.getType()));
 
-    verify(deletedBlockLog, times(1))
-        .onDatanodeDead(datanode1.getUuid());
+    verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, 
datanode1);
+    verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid());
 
     container1Replicas = containerManager
         
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index cb2315f7fd..89b1cbd3e1 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -86,6 +86,7 @@
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -2038,4 +2039,64 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
       assertEquals(emptyList(), nodeManager.getNodesByAddress(ipAddress));
     }
   }
+
+  private static Stream<Arguments> nodeStateTransitions() {
+    return Stream.of(
+        // start decommissioning or entering maintenance
+        Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE,
+                    HddsProtos.NodeOperationalState.DECOMMISSIONING, true),
+        Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE,
+                    HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, 
true),
+        // back to service (DataNodeAdminMonitor abort workflow, maintenance 
end time expired or node is dead)
+        Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+                    HddsProtos.NodeOperationalState.IN_SERVICE, true),
+        Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED,
+                    HddsProtos.NodeOperationalState.IN_SERVICE, true),
+        Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+                    HddsProtos.NodeOperationalState.IN_SERVICE, true),
+        Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE,
+                    HddsProtos.NodeOperationalState.IN_SERVICE, true),
+        // there is no under/over replicated containers on the node, completed 
the admin workflow
+        Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+                    HddsProtos.NodeOperationalState.DECOMMISSIONED, false),
+        Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+                    HddsProtos.NodeOperationalState.IN_MAINTENANCE, false)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("nodeStateTransitions")
+  public void testNodeOperationalStateChange(
+      HddsProtos.NodeOperationalState oldState,
+      HddsProtos.NodeOperationalState newState,
+      boolean shouldNotify)
+      throws IOException, NodeNotFoundException, AuthenticationException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
+    when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
+    EventPublisher eventPublisher = mock(EventPublisher.class);
+    HDDSLayoutVersionManager lvm = new 
HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion());
+    createNodeManager(getConf());
+    SCMNodeManager nodeManager = new SCMNodeManager(conf,
+        scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
+        scmContext, lvm);
+
+    DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
+    datanode.setPersistedOpState(oldState);
+    nodeManager.register(datanode, null, 
HddsTestUtils.getRandomPipelineReports());
+
+    nodeManager.setNodeOperationalState(datanode, newState, 0);
+    verify(eventPublisher, 
times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode);
+
+    DatanodeDetails reportedDatanode = 
MockDatanodeDetails.createDatanodeDetails(
+        datanode.getUuid());
+    reportedDatanode.setPersistedOpState(newState);
+
+    nodeManager.processHeartbeat(reportedDatanode);
+
+    verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent(
+        SCMEvents.REPLICATION_MANAGER_NOTIFY, reportedDatanode);
+
+    nodeManager.close();
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
new file mode 100644
index 0000000000..8556a01e53
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort;
+import static 
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState;
+import static 
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState;
+import static 
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneKeyLocation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for ReplicationManager.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class TestReplicationManagerIntegration {
+  private static final int DATANODE_COUNT = 5;
+  private static final int HEALTHY_REPLICA_NUM = 3;
+  private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = 
RatisReplicationConfig
+      .getInstance(HddsProtos.ReplicationFactor.THREE);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationManagerIntegration.class);
+
+  private MiniOzoneCluster cluster;
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ReplicationManager replicationManager;
+  private StorageContainerManager scm;
+  private OzoneClient client;
+  private ContainerOperationClient scmClient;
+  private OzoneBucket bucket;
+
+  @BeforeAll
+  void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        100, MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, MILLISECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, 
MILLISECONDS);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, MILLISECONDS);
+    conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 100, MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 2, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+        1, SECONDS);
+    conf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+        1, SECONDS);
+    conf.setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        0, SECONDS);
+    conf.set(OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+    conf.set(OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+    conf.set(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
+
+    ReplicationManagerConfiguration replicationConf = 
conf.getObject(ReplicationManagerConfiguration.class);
+    replicationConf.setInterval(Duration.ofSeconds(1));
+    replicationConf.setUnderReplicatedInterval(Duration.ofMillis(100));
+    replicationConf.setOverReplicatedInterval(Duration.ofMillis(100));
+    conf.setFromObject(replicationConf);
+
+    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(DATANODE_COUNT);
+
+    cluster = builder.build();
+    cluster.getConf().setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 
0, SECONDS);
+    cluster.waitForClusterToBeReady();
+
+    scm = cluster.getStorageContainerManager();
+    nodeManager = scm.getScmNodeManager();
+    containerManager = scm.getContainerManager();
+    replicationManager = scm.getReplicationManager();
+
+    client = cluster.newClient();
+    scmClient = new ContainerOperationClient(cluster.getConf());
+    bucket = TestDataUtil.createVolumeAndBucket(client);
+  }
+
+  @AfterAll
+  void shutdown() {
+    IOUtils.close(LOG, client, scmClient, cluster);
+  }
+
+  @Order(1)
+  @Test
+  void testReplicationManagerNotify() throws Exception {
+    // Test if RM notify works
+    replicationManager.getConfig().setInterval(Duration.ofSeconds(300));
+    GenericTestUtils.waitFor(() -> replicationManager.isThreadWaiting(), 200, 
30000);
+  }
+
+  @Order(Integer.MAX_VALUE)
+  @Test
+  public void testClosedContainerReplicationWhenNodeDies()
+      throws Exception {
+    String keyName = "key-" + UUID.randomUUID();
+    TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG,
+        "this is the content".getBytes(StandardCharsets.UTF_8));
+
+    // Get the container ID for the key
+    OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+    List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
+    long containerID = keyLocations.get(0).getContainerID();
+    ContainerID containerId = ContainerID.valueOf(containerID);
+    // open container would not be handled to do any further processing in RM
+    OzoneTestUtils.closeContainer(scm, 
containerManager.getContainer(containerId));
+
+    assertEquals(HEALTHY_REPLICA_NUM, 
containerManager.getContainerReplicas(containerId).size());
+
+    final DatanodeDetails targetDatanode = findReplica(containerId);
+
+    cluster.shutdownHddsDatanode(targetDatanode);
+    waitForDnToReachHealthState(nodeManager, targetDatanode, DEAD);
+
+    // Check if the replicas nodes don't contain dead one
+    // and the replica of container replica num is considered to be healthy
+    GenericTestUtils.waitFor(() -> {
+      try {
+        Set<ContainerReplica> replicas = 
containerManager.getContainerReplicas(containerId);
+        boolean deadNodeNotInContainerReplica = replicas.stream()
+            .noneMatch(r -> r.getDatanodeDetails().equals(targetDatanode));
+        boolean hasHealthyReplicaNum = replicas.size() == HEALTHY_REPLICA_NUM;
+        return deadNodeNotInContainerReplica && hasHealthyReplicaNum;
+      } catch (ContainerNotFoundException e) {
+        return false;
+      }
+    }, 100, 30000);
+  }
+
+  private DatanodeDetails findReplica(ContainerID containerId) throws 
ContainerNotFoundException {
+    // Find a datanode that has a replica of this container
+    return containerManager.getContainerReplicas(containerId).stream()
+        .findFirst()
+        .orElseThrow(() -> new AssertionError("Replica not found for " + 
containerId))
+        .getDatanodeDetails();
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = NodeOperationalState.class, names = {"IN_MAINTENANCE", 
"DECOMMISSIONED"})
+  void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService(
+      NodeOperationalState expectedOpState)
+      throws Exception {
+
+    String keyName = "key-" + UUID.randomUUID();
+    TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG,
+        "this is the content".getBytes(StandardCharsets.UTF_8));
+
+    OzoneKeyDetails key = bucket.getKey(keyName);
+    List<OzoneKeyLocation> keyLocations = key.getOzoneKeyLocations();
+
+    long containerID = keyLocations.get(0).getContainerID();
+    ContainerID containerId = ContainerID.valueOf(containerID);
+    ContainerInfo containerInfo = containerManager.getContainer(containerId);
+    OzoneTestUtils.closeContainer(scm, containerInfo);
+
+    assertEquals(containerManager.getContainerReplicas(containerId).size(), 
HEALTHY_REPLICA_NUM);
+
+    DatanodeDetails datanode = findReplica(containerId);
+
+    if (expectedOpState == IN_MAINTENANCE) {
+      
scmClient.startMaintenanceNodes(Collections.singletonList(getDNHostAndPort(datanode)),
 0, false);
+      waitForDnToReachOpState(nodeManager, datanode, expectedOpState);
+      assertEquals(containerManager.getContainerReplicas(containerId).size(),
+          HEALTHY_REPLICA_NUM);
+    } else {
+      
scmClient.decommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)),
 false);
+      waitForDnToReachOpState(nodeManager, datanode, expectedOpState);
+      // decommissioning node would be excluded
+      assertEquals(containerManager.getContainerReplicas(containerId).size(),
+          HEALTHY_REPLICA_NUM + 1);
+    }
+
+    // bring the node back to service
+    
scmClient.recommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)));
+
+    waitForDnToReachOpState(nodeManager, datanode, IN_SERVICE);
+    waitForDnToReachPersistedOpState(datanode, IN_SERVICE);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return containerManager.getContainerReplicas(containerId).size() == 
HEALTHY_REPLICA_NUM;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 200, 30000);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to