This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fd8948134ef HDDS-13890. Datanode supports dynamic configuration of SCM 
(#9385)
fd8948134ef is described below

commit fd8948134efa1e96505d4446d4657f2ad5003396
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Dec 15 16:48:22 2025 +0800

    HDDS-13890. Datanode supports dynamic configuration of SCM (#9385)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  35 ++
 .../org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java |   7 +-
 .../hadoop/hdds/conf/ConfigurationSource.java      |   4 +
 .../apache/hadoop/ozone/HddsDatanodeService.java   | 128 ++++++
 .../common/statemachine/DatanodeQueueMetrics.java  |  22 +
 .../common/statemachine/DatanodeStateMachine.java  |  16 +
 .../common/statemachine/SCMConnectionManager.java  |  20 +-
 .../common/statemachine/StateContext.java          |  11 +
 .../container/common/states/DatanodeState.java     |   3 +-
 .../states/datanode/RunningDatanodeState.java      |  14 +-
 .../states/datanode/TestRunningDatanodeState.java  |   1 +
 .../hdds/scm/server/StorageContainerManager.java   |  21 +-
 .../scm/TestDatanodeSCMNodesReconfiguration.java   | 446 +++++++++++++++++++++
 hadoop-ozone/mini-cluster/pom.xml                  |   4 +
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       | 203 ++++++++++
 15 files changed, 923 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 9170f61a3f7..e002de647a8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -31,6 +31,7 @@
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -57,10 +58,12 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import javax.management.ObjectName;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.ConfigRedactor;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -331,6 +334,38 @@ public static Collection<InetSocketAddress> 
getSCMAddressForDatanodes(
     }
   }
 
+  /**
+   * Returns the SCM address for datanodes based on the service ID and the SCM 
addresses.
+   * @param conf Configuration
+   * @param scmServiceId SCM service ID
+   * @param scmNodeIds Requested SCM node IDs
+   * @return A collection with addresses of the request SCM node IDs.
+   * Null if there is any wrongly configured SCM address. Note that the 
returned collection
+   * might not be ordered the same way as the requested SCM node IDs
+   */
+  public static Collection<Pair<String, InetSocketAddress>> 
getSCMAddressForDatanodes(
+      ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
+    Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new 
HashSet<>(scmNodeIds.size());
+    for (String scmNodeId : scmNodeIds) {
+      String addressKey = ConfUtils.addKeySuffixes(
+          OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+      String scmAddress = conf.get(addressKey);
+      if (scmAddress == null) {
+        LOG.warn("The SCM address configuration {} is not defined, return 
nothing", addressKey);
+        return null;
+      }
+
+      int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
+          OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
+          OZONE_SCM_DATANODE_PORT_DEFAULT);
+
+      String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress, 
scmDatanodePort);
+      InetSocketAddress scmDatanodeAddress = 
NetUtils.createSocketAddr(scmDatanodeAddressStr);
+      scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
+    }
+    return scmNodeAddress;
+  }
+
   /**
    * Retrieve the socket addresses of recon.
    *
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
index d0806b34d58..53bc67b1fe9 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
@@ -181,12 +181,11 @@ public static List<SCMNodeInfo> 
buildNodeInfo(ConfigurationSource conf) {
 
   }
 
-  private static String buildAddress(String address, int port) {
-    return new StringBuilder().append(address).append(':')
-        .append(port).toString();
+  public static String buildAddress(String address, int port) {
+    return address + ':' + port;
   }
 
-  private static int getPort(ConfigurationSource conf,
+  public static int getPort(ConfigurationSource conf,
       String scmServiceId, String scmNodeId, String configKey,
       String portKey, int defaultPort) {
     String suffixKey = ConfUtils.addKeySuffixes(configKey, scmServiceId,
diff --git 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
index ff659ddba35..5fc7c3c3ddc 100644
--- 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
+++ 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
@@ -98,6 +98,10 @@ default String getTrimmed(String key, String defaultValue) {
 
   default String[] getTrimmedStrings(String name) {
     String valueString = get(name);
+    return getTrimmedStringsFromValue(valueString);
+  }
+
+  static String[] getTrimmedStringsFromValue(String valueString) {
     if (null == valueString) {
       return EMPTY_STRING_ARRAY;
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 3efa83a55cd..28d5d5a2cb1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -19,6 +19,7 @@
 
 import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTP;
 import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTPS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
 import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
@@ -34,23 +35,33 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.management.ObjectName;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hdds.DatanodeVersion;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -72,6 +83,9 @@
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -124,6 +138,7 @@ public class HddsDatanodeService extends GenericCli 
implements Callable<Void>, S
   private HddsDatanodeClientProtocolServer clientProtocolServer;
   private OzoneAdmins admins;
   private ReconfigurationHandler reconfigurationHandler;
+  private String scmServiceId;
 
   //Constructor for DataNode PluginService
   public HddsDatanodeService() { }
@@ -207,6 +222,7 @@ public void start(OzoneConfiguration configuration) {
     start();
   }
 
+  @SuppressWarnings("methodlength")
   public void start() {
     serviceRuntimeInfo = new DNMXBeanImpl(HddsVersionInfo.HDDS_VERSION_INFO) {
       @Override
@@ -294,6 +310,12 @@ public String getNamespace() {
               .register(REPLICATION_STREAMS_LIMIT_KEY,
                   this::reconfigReplicationStreamsLimit);
 
+      scmServiceId = HddsUtils.getScmServiceId(conf);
+      if (scmServiceId != null) {
+        reconfigurationHandler.register(OZONE_SCM_NODES_KEY + "." + 
scmServiceId,
+            this::reconfigScmNodes);
+      }
+
       
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
 
       datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, 
conf,
@@ -680,6 +702,112 @@ private String reconfigBlockDeletingServiceTimeout(String 
value) {
     return value;
   }
 
+  /**
+   * Reconfigure the SCM nodes configuration which will trigger the creation 
and removal of
+   * SCM connections based on the difference between the old and the new SCM 
nodes configuration.
+   * <p>
+   * The assumption is that the SCM node address configurations exists for all 
the involved node IDs
+   * This is because reconfiguration can only support one configuration field 
at a time
+   * @param value The new configuration value for "ozone.scm.nodes.SERVICEID"
+   * @return new configuration for "ozone.scm.nodes.SERVICEID" which reflects 
the SCMs that the datanode has
+   *         is not connected to.
+   */
+  private String reconfigScmNodes(String value) {
+    if (StringUtils.isBlank(value)) {
+      throw new IllegalArgumentException("Reconfiguration failed since setting 
the empty SCM nodes " +
+          "configuration is not allowed");
+    }
+    Set<String> previousNodeIds = new 
HashSet<>(HddsUtils.getSCMNodeIds(getConf(), scmServiceId));
+    Set<String> newScmNodeIds = 
Stream.of(ConfigurationSource.getTrimmedStringsFromValue(value))
+        .collect(Collectors.toSet());
+
+    if (newScmNodeIds.isEmpty()) {
+      throw new IllegalArgumentException("Reconfiguration failed since setting 
the empty SCM nodes " +
+          "configuration is not allowed");
+    }
+
+    Set<String> scmNodesIdsToAdd = Sets.difference(newScmNodeIds, 
previousNodeIds);
+    Set<String> scmNodesIdsToRemove = Sets.difference(previousNodeIds, 
newScmNodeIds);
+
+    // We should only update configuration with the SCMs that are actually 
added / removed
+    // If there is partial reconfiguration (e.g. one successful add and one 
failed add),
+    // we want to be able to retry on the failed node reconfiguration.
+    // If we don't handle this, the subsequent reconfiguration will not work 
since the node
+    // configuration is already exists / removed.
+    Set<String> effectiveScmNodeIds = new HashSet<>(previousNodeIds);
+
+    LOG.info("Reconfiguring SCM nodes for service ID {} with new SCM nodes {} 
and remove SCM nodes {}",
+        scmServiceId, scmNodesIdsToAdd, scmNodesIdsToRemove);
+
+    Collection<Pair<String, InetSocketAddress>> scmToAdd = 
HddsUtils.getSCMAddressForDatanodes(
+        getConf(), scmServiceId, scmNodesIdsToAdd);
+    if (scmToAdd == null) {
+      throw new IllegalStateException("Reconfiguration failed to get SCM 
address to add due to wrong configuration");
+    }
+    Collection<Pair<String, InetSocketAddress>> scmToRemove = 
HddsUtils.getSCMAddressForDatanodes(
+        getConf(), scmServiceId, scmNodesIdsToRemove);
+    if (scmToRemove == null) {
+      throw new IllegalArgumentException(
+          "Reconfiguration failed to get SCM address to remove due to wrong 
configuration");
+    }
+
+    StateContext context = datanodeStateMachine.getContext();
+    SCMConnectionManager connectionManager = 
datanodeStateMachine.getConnectionManager();
+
+    // Assert that the datanode is in RUNNING state since
+    // 1. If the datanode state is INIT, there might be concurrent connection 
manager operations
+    //    that might cause unpredictable behaviors
+    // 2. If the datanode state is SHUTDOWN, it means that datanode is 
shutting down and there is no need
+    //    to reconfigure the connections.
+    if (!DatanodeStates.RUNNING.equals(context.getState())) {
+      throw new IllegalStateException("Reconfiguration failed since the 
datanode the current state" +
+          context.getState().toString() + " is not in RUNNING state");
+    }
+
+    // Add the new SCM servers
+    for (Pair<String, InetSocketAddress> pair : scmToAdd) {
+      String scmNodeId = pair.getLeft();
+      InetSocketAddress scmAddress = pair.getRight();
+      if (scmAddress.isUnresolved()) {
+        LOG.warn("Reconfiguration failed to add SCM address {} for SCM service 
{} since it can't " +
+            "be resolved, skipping", scmAddress, scmServiceId);
+        continue;
+      }
+      try {
+        connectionManager.addSCMServer(scmAddress, 
context.getThreadNamePrefix());
+        context.addEndpoint(scmAddress);
+        effectiveScmNodeIds.add(scmNodeId);
+        LOG.info("Reconfiguration successfully add SCM address {} for SCM 
service {}", scmAddress, scmServiceId);
+      } catch (IOException e) {
+        LOG.error("Reconfiguration failed to add SCM address {} for SCM 
service {}", scmAddress, scmServiceId, e);
+      }
+    }
+
+    // Remove the old SCM server
+    for (Pair<String, InetSocketAddress> pair : scmToRemove) {
+      String scmNodeId = pair.getLeft();
+      InetSocketAddress scmAddress = pair.getRight();
+      try {
+        connectionManager.removeSCMServer(scmAddress);
+        context.removeEndpoint(scmAddress);
+        effectiveScmNodeIds.remove(scmNodeId);
+        LOG.info("Reconfiguration successfully remove SCM address {} for SCM 
service {}",
+            scmAddress, scmServiceId);
+      } catch (IOException e) {
+        LOG.error("Reconfiguration failed to remove SCM address {} for SCM 
service {}", scmAddress, scmServiceId, e);
+      }
+    }
+
+    // Resize the executor pool size to (number of SCMs + 1 Recon)
+    // Refer to DatanodeStateMachine#getEndPointTaskThreadPoolSize
+    
datanodeStateMachine.resizeExecutor(connectionManager.getNumOfConnections());
+
+    // TODO: In the future, we might also do some assertions on the SCM
+    //  - The SCM cannot be a leader since this causes the datanode to 
disappear
+    //  - The SCM should be decommissioned
+    return String.join(",", effectiveScmNodeIds);
+  }
+
   /**
    * Returns the initial version of the datanode.
    */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
index c0ed734da69..d442b95285d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
@@ -19,6 +19,7 @@
 
 import static org.apache.hadoop.metrics2.lib.Interns.info;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
@@ -171,6 +172,27 @@ public void addEndpoint(InetSocketAddress endpoint) {
                 .to(CaseFormat.UPPER_CAMEL, k.getHostName())));
   }
 
+  public void removeEndpoint(InetSocketAddress endpoint) {
+    incrementalReportsQueueMap.remove(endpoint);
+    containerActionQueueMap.remove(endpoint);
+    pipelineActionQueueMap.remove(endpoint);
+  }
+
+  @VisibleForTesting
+  public int getIncrementalReportsQueueMapSize() {
+    return incrementalReportsQueueMap.size();
+  }
+
+  @VisibleForTesting
+  public int getContainerActionQueueMapSize() {
+    return containerActionQueueMap.size();
+  }
+
+  @VisibleForTesting
+  public int getPipelineActionQueueMapSize() {
+    return pipelineActionQueueMap.size();
+  }
+
   private MetricsInfo getMetricsInfo(String prefix, String metricName) {
     String metric = prefix + WordUtils.capitalize(metricName) + "Size";
     String description = "Queue size of " + metricName + " from " + prefix;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ad64b291e16..f05a14e2109 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -45,6 +45,7 @@
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.NettyMetrics;
 import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -795,4 +796,19 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
   public void setNextHB(long time) {
     nextHB.set(time);
   }
+
+  @VisibleForTesting
+  public ExecutorService getExecutorService() {
+    return executorService;
+  }
+
+  /**
+   * Resize the executor based on the number of active endpoint tasks.
+   */
+  public void resizeExecutor(int size) {
+    if (executorService instanceof ThreadPoolExecutor) {
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
+      HddsServerUtil.setPoolSize(tpe, size, LOG);
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
index 27bbb30d77b..e5d586832c5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.ipc_.RPC;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetUtils;
+import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
@@ -227,15 +228,14 @@ public void addReconServer(InetSocketAddress address,
   public void removeSCMServer(InetSocketAddress address) throws IOException {
     writeLock();
     try {
-      if (!scmMachines.containsKey(address)) {
+      EndpointStateMachine endPoint = scmMachines.remove(address);
+      if (endPoint == null) {
         LOG.warn("Trying to remove a non-existent SCM machine. " +
             "Ignoring the request.");
         return;
       }
-
-      EndpointStateMachine endPoint = scmMachines.get(address);
+      endPoint.setState(EndPointStates.SHUTDOWN);
       endPoint.close();
-      scmMachines.remove(address);
     } finally {
       writeUnlock();
     }
@@ -274,4 +274,16 @@ public List<EndpointStateMachineMBean> getSCMServers() {
       readUnlock();
     }
   }
+
+  /**
+   * @return the number of connections (both SCM and Recon)
+   */
+  public int getNumOfConnections() {
+    readLock();
+    try {
+      return scmMachines.size();
+    } finally {
+      readUnlock();
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 24496525a56..150159eb84a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -911,6 +911,17 @@ public void addEndpoint(InetSocketAddress endpoint) {
     }
   }
 
+  public void removeEndpoint(InetSocketAddress endpoint) {
+    this.endpoints.remove(endpoint);
+    this.containerActions.remove(endpoint);
+    this.pipelineActions.remove(endpoint);
+    this.incrementalReportsQueue.remove(endpoint);
+    this.isFullReportReadyToBeSent.remove(endpoint);
+    if (getQueueMetrics() != null) {
+      getQueueMetrics().removeEndpoint(endpoint);
+    }
+  }
+
   @VisibleForTesting
   public Message getContainerReports() {
     return containerReports.get();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
index d1d0f6dc7bd..8eeee8793d1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
@@ -38,8 +38,9 @@ public interface DatanodeState<T> {
 
   /**
    * Executes one or more tasks that is needed by this state.
+   * Note that it is unsafe to call this method concurrently.
    *
-   * @param executor -  ExecutorService
+   * @param executor -  ExecutorService that can be used by the DatanodeState 
to submit tasks to.
    */
   void execute(ExecutorService executor);
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index e246238b541..6b8ea712a93 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -52,6 +52,10 @@ public class RunningDatanodeState implements DatanodeState {
   private final ConfigurationSource conf;
   private final StateContext context;
   private CompletionService<EndPointStates> ecs;
+  // Since we connectionManager endpoints can be changed by reconfiguration
+  // we should not rely on ConnectionManager#getValues being unchanged between
+  // execute and await
+  private int executingEndpointCount = 0;
 
   public RunningDatanodeState(ConfigurationSource conf,
       SCMConnectionManager connectionManager,
@@ -85,6 +89,7 @@ public void onExit() {
   @Override
   public void execute(ExecutorService executor) {
     ecs = new ExecutorCompletionService<>(executor);
+    executingEndpointCount = 0;
     for (EndpointStateMachine endpoint : connectionManager.getValues()) {
       Callable<EndPointStates> endpointTask = buildEndPointTask(endpoint);
       if (endpointTask != null) {
@@ -109,6 +114,7 @@ public void execute(ExecutorService executor) {
             throw timeoutEx;
           }
         });
+        executingEndpointCount++;
       } else {
         // This can happen if a task is taking more time than the timeOut
         // specified for the task in await, and when it is completed the task
@@ -125,6 +131,11 @@ public void 
setExecutorCompletionService(ExecutorCompletionService e) {
     this.ecs = e;
   }
 
+  @VisibleForTesting
+  public void setExecutingEndpointCount(int executingEndpointCount) {
+    this.executingEndpointCount = executingEndpointCount;
+  }
+
   private Callable<EndPointStates> buildEndPointTask(
       EndpointStateMachine endpoint) {
     switch (endpoint.getState()) {
@@ -199,14 +210,13 @@ private Callable<EndPointStates> buildEndPointTask(
   public DatanodeStateMachine.DatanodeStates
       await(long duration, TimeUnit timeUnit)
       throws InterruptedException {
-    int count = connectionManager.getValues().size();
     int returned = 0;
     long durationMS = timeUnit.toMillis(duration);
     long timeLeft = durationMS;
     long startTime = Time.monotonicNow();
     List<Future<EndPointStates>> results = new LinkedList<>();
 
-    while (returned < count && timeLeft > 0) {
+    while (returned < executingEndpointCount && timeLeft > 0) {
       Future<EndPointStates> result =
           ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
       if (result != null) {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
index 84313c82b98..51390ae632b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
@@ -64,6 +64,7 @@ public void testAwait() throws InterruptedException {
     for (int i = 0; i < threadPoolSize; i++) {
       ecs.submit(() -> futureOne.get());
     }
+    state.setExecutingEndpointCount(threadPoolSize);
 
     long startTime = Time.monotonicNow();
     state.await(500, TimeUnit.MILLISECONDS);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 2a9c5f0ae49..f9a7f94f8e5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -283,7 +283,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   private RootCARotationManager rootCARotationManager;
   private ContainerTokenSecretManager containerTokenMgr;
 
-  private final OzoneConfiguration configuration;
+  private OzoneConfiguration configuration;
   private SCMContainerMetrics scmContainerMetrics;
   private SCMContainerPlacementMetrics placementMetrics;
   private PlacementPolicy containerPlacementPolicy;
@@ -603,6 +603,11 @@ public OzoneConfiguration getConfiguration() {
     return configuration;
   }
 
+  @VisibleForTesting
+  public void setConfiguration(OzoneConfiguration conf) {
+    this.configuration = conf;
+  }
+
   /**
    * Create an SCM instance based on the supplied configuration.
    *
@@ -2245,6 +2250,20 @@ public boolean removePeerFromHARing(String scmId)
 
   }
 
+  /**
+   * Check if the input scmId exists in the peers list.
+   * @return true if the nodeId is self, or it exists in peer node list,
+   *         false otherwise.
+   */
+  @VisibleForTesting
+  public boolean doesPeerExist(String scmId) {
+    if (getScmId().equals(scmId)) {
+      return true;
+    }
+    return getScmHAManager().getRatisServer().getDivision()
+        .getGroup().getPeer(RaftPeerId.valueOf(scmId)) != null;
+  }
+
   public void scmHAMetricsUpdate(String leaderId) {
     // unregister, in case metrics already exist
     // so that the metric tags will get updated.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
new file mode 100644
index 00000000000..d2c90c21d65
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test datanode's SCM nodes reconfiguration.
+ */
+@Timeout(300)
+public class TestDatanodeSCMNodesReconfiguration {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestDatanodeSCMNodesReconfiguration.class);
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private String scmServiceId;
+
+  @BeforeEach
+  public void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+    conf.set(ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+        "5s");
+    conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, "1");
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+        MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setQuietMode(false);
+    scmServiceId = "scm-service-test1";
+    cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setOMServiceId("om-service-test1")
+        .setSCMServiceId(scmServiceId)
+        .setNumOfStorageContainerManagers(3)
+        .setNumOfOzoneManagers(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterEach
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * This tests the SCM migration scenario using datanode reconfiguration with 
zero restarts.
+   */
+  @Test
+  void testSCMMigration() throws Exception {
+    assertEquals(3, cluster.getStorageContainerManagersList().size());
+    // Bootstrap three new SCMs (there will be 6 SCMs after this)
+    // (SCM ID, SCM Node ID)
+    List<Pair<String, String>> initialSCMs = 
cluster.getStorageContainerManagersList().stream()
+        .map(scm -> Pair.of(scm.getScmId(), 
scm.getSCMNodeId())).collect(Collectors.toList());
+    String newScmNodeIdPrefix = "newScmNode-";
+    List<String> newSCMIds = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      String scmNodeId = newScmNodeIdPrefix + i;
+      cluster.bootstrapSCM(scmNodeId, true);
+      StorageContainerManager newSCM = cluster.getSCM(scmNodeId);
+      newSCMIds.add(newSCM.getScmId());
+    }
+    cluster.waitForClusterToBeReady();
+
+    // Reconfigure the datanodes to add the three new SCMs
+    String scmNodesKey = ConfUtils.addKeySuffixes(
+        OZONE_SCM_NODES_KEY, scmServiceId);
+
+    for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+      assertIsPropertyReconfigurable(datanode, scmNodesKey);
+      // SCM addresses need to be added to the datanode configuration first, 
reconfiguration will fail otherwise
+      for (StorageContainerManager scm : 
cluster.getStorageContainerManagers()) {
+        String scmAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, 
scm.getSCMNodeId());
+        datanode.getConf().set(scmAddrKey, cluster.getConf().get(scmAddrKey));
+        String dnPortKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, scmServiceId, 
scm.getSCMNodeId());
+        datanode.getConf().set(dnPortKey, cluster.getConf().get(dnPortKey));
+      }
+
+      // Sanity check before reconfiguration, there should still be 3 datanodes
+      assertEquals(3, 
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+      // Trigger reconfiguration which will create new connections to the SCMs
+      datanode.getReconfigurationHandler().reconfigureProperty(
+          scmNodesKey, cluster.getConf().get(scmNodesKey)
+      );
+      String newValue = datanode.getConf().get(scmNodesKey);
+
+      // Assert that the datanode has added the new SCMs
+      String[] scmNodeIds = newValue.split(",");
+      assertEquals(6, scmNodeIds.length);
+      for (String scmNodeId : scmNodeIds) {
+        assertTrue(cluster.isSCMActive(scmNodeId));
+      }
+      assertEquals(6, 
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+      assertEquals(6, 
datanode.getDatanodeStateMachine().getQueueMetrics().getIncrementalReportsQueueMapSize());
+      assertEquals(6, 
datanode.getDatanodeStateMachine().getQueueMetrics().getContainerActionQueueMapSize());
+      assertEquals(6, 
datanode.getDatanodeStateMachine().getQueueMetrics().getPipelineActionQueueMapSize());
+      // There are no Recon so the thread pool size is equal to the number of 
SCMs
+      assertEquals(6, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getCorePoolSize());
+      assertEquals(6, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getMaximumPoolSize());
+
+      Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine().getConnectionManager()
+          .getValues();
+      for (EndpointStateMachine scmMachine : scmMachines) {
+        assertEquals("SCM", scmMachine.getType());
+      }
+      assertEquals(6, scmMachines.size());
+    }
+
+    // Ensure that the datanodes have registered to the new SCMs
+    cluster.waitForClusterToBeReady();
+
+    GenericTestUtils.waitFor(() -> {
+      for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+        Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine()
+            .getConnectionManager().getValues();
+        for (EndpointStateMachine scmMachine : scmMachines) {
+          if (!scmMachine.getState().equals(EndPointStates.HEARTBEAT)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, 1000, 30000);
+
+    // Wait until the added datanodes are HEALTHY for all the SCMs, which 
indicates
+    // that the added datanode has already registered to the SCMs
+    List<StorageContainerManager> activeSCMs = new ArrayList<>();
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      String scmNodeId = scm.getSCMNodeId();
+      if (cluster.isSCMActive(scmNodeId)) {
+        activeSCMs.add(scm);
+      }
+    }
+    GenericTestUtils.waitFor(() -> {
+      for (StorageContainerManager activeSCM : activeSCMs) {
+        int healthy = activeSCM.getNodeCount(HEALTHY);
+        int staleOrDead = activeSCM.getNodeCount(STALE) + 
activeSCM.getNodeCount(DEAD);
+        if (healthy != 3 || staleOrDead != 0) {
+          LOG.info("SCM {} currently has mismatched healthy (current {}, 
expected 3) or stale/dead DNs " +
+                  "(current {}, expected 0), waiting for next checks",
+              activeSCM.getSCMNodeId(), healthy, staleOrDead);
+          return false;
+        }
+      }
+      return true;
+    }, 1000, 120000);
+
+    // Transfer SCM leadership to one of the new SCMs before decommissioning 
the old SCMs
+    Collections.shuffle(newSCMIds);
+    String newLeaderScmId = newSCMIds.get(0);
+    
cluster.getStorageContainerLocationClient().transferLeadership(newLeaderScmId);
+
+    // Decommission the initial SCMs (there will be 3 SCMs after this)
+    for (Pair<String, String> pair : initialSCMs) {
+      decommissionSCM(pair.getLeft(), pair.getRight());
+    }
+
+    for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) {
+      // Sanity check before reconfiguration, there should still be 6 datanodes
+      assertEquals(6, 
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+      // Reconfigure the datanodes to remove the three initial SCMs
+      datanode.getReconfigurationHandler().reconfigureProperty(
+          scmNodesKey, cluster.getConf().get(scmNodesKey)
+      );
+      String newValue = datanode.getConf().get(scmNodesKey);
+
+      // Assert that the datanode have removed the initial SCMs
+      String[] scmNodeIds = newValue.split(",");
+      assertEquals(3, scmNodeIds.length);
+      for (String scmNodeId : scmNodeIds) {
+        assertTrue(cluster.isSCMActive(scmNodeId));
+      }
+      assertEquals(3, 
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+      assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getIncrementalReportsQueueMapSize());
+      assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getContainerActionQueueMapSize());
+      assertEquals(3, 
datanode.getDatanodeStateMachine().getQueueMetrics().getPipelineActionQueueMapSize());
+      // There are no Recon so the thread pool size is equal to the number of 
SCMs
+      assertEquals(3, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getCorePoolSize());
+      assertEquals(3, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getMaximumPoolSize());
+
+      Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine()
+          .getConnectionManager().getValues();
+      for (EndpointStateMachine scmMachine : scmMachines) {
+        assertEquals("SCM", scmMachine.getType());
+      }
+      assertEquals(3, scmMachines.size());
+    }
+  }
+
+  /**
+   * Test the addition and removal of a single SCM using datanode 
reconfiguration.
+   * @throws Exception
+   */
+  @Test
+  void testAddAndRemoveOneSCM() throws Exception {
+    assertEquals(3, cluster.getStorageContainerManagersList().size());
+    // Bootstrap a single SCM
+    String newScmNodeId = "newScmNode-1";
+    cluster.bootstrapSCM(newScmNodeId, true);
+    StorageContainerManager newSCM = cluster.getSCM(newScmNodeId);
+    cluster.waitForClusterToBeReady();
+
+    // Reconfigure the datanode to add the new SCM
+    String scmNodesKey = ConfUtils.addKeySuffixes(OZONE_SCM_NODES_KEY, 
scmServiceId);
+
+    for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+      assertIsPropertyReconfigurable(datanode, scmNodesKey);
+      // SCM addresses need to be added to the datanode configuration first, 
reconfiguration will fail otherwise
+      for (StorageContainerManager scm : 
cluster.getStorageContainerManagers()) {
+        String scmAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, 
scm.getSCMNodeId());
+        datanode.getConf().set(scmAddrKey, cluster.getConf().get(scmAddrKey));
+        String dnPortKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, scmServiceId, 
scm.getSCMNodeId());
+        datanode.getConf().set(dnPortKey, cluster.getConf().get(dnPortKey));
+      }
+
+      // Sanity check before reconfiguration, there should still be 3 datanodes
+      assertEquals(3, 
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+      // Trigger reconfiguration which will create new connections to the new 
SCM
+      datanode.getReconfigurationHandler().reconfigureProperty(
+          scmNodesKey, cluster.getConf().get(scmNodesKey)
+      );
+      String newValue = datanode.getConf().get(scmNodesKey);
+
+      // Assert that the datanode has added the new SCMs
+      String[] scmNodeIds = newValue.split(",");
+      assertEquals(4, scmNodeIds.length);
+      for (String scmNodeId : scmNodeIds) {
+        assertTrue(cluster.isSCMActive(scmNodeId));
+      }
+      assertEquals(4, 
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+      assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getIncrementalReportsQueueMapSize());
+      assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getContainerActionQueueMapSize());
+      assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getPipelineActionQueueMapSize());
+      // There are no Recon so the thread pool size is equal to the number of 
SCMs
+      assertEquals(4, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getCorePoolSize());
+      assertEquals(4, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getMaximumPoolSize());
+      Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine()
+          .getConnectionManager().getValues();
+      for (EndpointStateMachine scmMachine : scmMachines) {
+        assertEquals("SCM", scmMachine.getType());
+      }
+      assertEquals(4, scmMachines.size());
+    }
+
+    // Ensure that the datanodes have registered to the new SCM
+    cluster.waitForClusterToBeReady();
+
+    GenericTestUtils.waitFor(() -> {
+      for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+        Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine()
+            .getConnectionManager().getValues();
+        for (EndpointStateMachine scmMachine : scmMachines) {
+          if (!scmMachine.getState().equals(EndPointStates.HEARTBEAT)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, 1000, 30000);
+
+    // Wait until the added datanodes are HEALTHY for all the SCMs, which 
indicates
+    // that the added datanode has already registered to the SCMs
+    List<StorageContainerManager> activeSCMs = new ArrayList<>();
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      String scmNodeId = scm.getSCMNodeId();
+      if (cluster.isSCMActive(scmNodeId)) {
+        activeSCMs.add(scm);
+      }
+    }
+    GenericTestUtils.waitFor(() -> {
+      for (StorageContainerManager activeSCM : activeSCMs) {
+        int healthy = activeSCM.getNodeCount(HEALTHY);
+        int staleOrDead = activeSCM.getNodeCount(STALE) + 
activeSCM.getNodeCount(DEAD);
+        if (healthy != 3 || staleOrDead != 0) {
+          LOG.info("SCM {} currently has mismatched healthy (current {}, 
expected 3) or stale/dead DNs " +
+                  "(current {}, expected 0), waiting for next checks",
+              activeSCM.getSCMNodeId(), healthy, staleOrDead);
+          return false;
+        }
+      }
+      return true;
+    }, 1000, 120000);
+
+    // Now reconfigure DN to remove the new SCM
+    Collection<String> scmNodes = 
cluster.getConf().getTrimmedStringCollection(scmNodesKey);
+    scmNodes.remove(newScmNodeId);
+    cluster.getConf().setStrings(scmNodesKey, scmNodes.toArray(new String[0]));
+
+    for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) {
+      // Sanity check before reconfiguration, there should be 4 SCMs in the 
configuration
+      assertEquals(4, 
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+      // Reconfigure the datanodes to remove the new SCM
+      datanode.getReconfigurationHandler().reconfigureProperty(
+          scmNodesKey, cluster.getConf().get(scmNodesKey)
+      );
+      String newValue = datanode.getConf().get(scmNodesKey);
+
+      // Assert that the datanode have removed the initial SCMs
+      String[] scmNodeIds = newValue.split(",");
+      assertEquals(3, scmNodeIds.length);
+      for (String scmNodeId : scmNodeIds) {
+        assertTrue(cluster.isSCMActive(scmNodeId));
+      }
+      assertEquals(3, 
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+      assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics().
+          getIncrementalReportsQueueMapSize());
+      assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getContainerActionQueueMapSize());
+      assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+          .getPipelineActionQueueMapSize());
+      // There are no Recon so the thread pool size is equal to the number of 
SCMs
+      assertEquals(3, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getCorePoolSize());
+      assertEquals(3, ((ThreadPoolExecutor) 
datanode.getDatanodeStateMachine().getExecutorService())
+          .getMaximumPoolSize());
+
+      Collection<EndpointStateMachine> scmMachines = 
datanode.getDatanodeStateMachine()
+          .getConnectionManager().getValues();
+      for (EndpointStateMachine scmMachine : scmMachines) {
+        assertEquals("SCM", scmMachine.getType());
+      }
+      assertEquals(3, scmMachines.size());
+    }
+
+    // Since all DN has stopped sending heartbeats to the new SCM, the new SCM 
should mark
+    // these DNs as STALE/DEAD
+    GenericTestUtils.waitFor(() -> {
+      int healthy = newSCM.getNodeCount(HEALTHY);
+      int staleOrDead = newSCM.getNodeCount(STALE) + newSCM.getNodeCount(DEAD);
+      return healthy == 0 || staleOrDead == 3;
+    }, 1000, 120000);
+  }
+
+  private void decommissionSCM(String decommScmId, String decommScmNodeId) 
throws Exception {
+    // Decommissioned SCM will be stopped automatically, see 
SCMStateMachine#close
+    DecommissionScmResponseProto response = 
cluster.getStorageContainerLocationClient().decommissionScm(decommScmId);
+    assertTrue(response.getSuccess());
+    assertTrue(StringUtils.isEmpty(response.getErrorMsg()));
+
+    cluster.deactivateSCM(decommScmNodeId);
+
+    List<StorageContainerManager> activeSCMs = new ArrayList<>();
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      String scmNodeId = scm.getSCMNodeId();
+      if (cluster.isSCMActive(scmNodeId)) {
+        activeSCMs.add(scm);
+      }
+    }
+
+    // Update the configuration
+    String scmNodesKey = ConfUtils.addKeySuffixes(
+        OZONE_SCM_NODES_KEY, scmServiceId);
+
+    Collection<String> scmNodes = 
cluster.getConf().getTrimmedStringCollection(scmNodesKey);
+    scmNodes.remove(decommScmNodeId);
+    cluster.getConf().setStrings(scmNodesKey, scmNodes.toArray(new String[0]));
+
+    // Verify decomm node is removed from the HA ring
+    GenericTestUtils.waitFor(() -> {
+      for (StorageContainerManager scm : activeSCMs) {
+        if (scm.doesPeerExist(decommScmId)) {
+          return false;
+        }
+      }
+      return true;
+    }, 100, 60000);
+
+    cluster.waitForClusterToBeReady();
+  }
+
+  private void assertIsPropertyReconfigurable(HddsDatanodeService datanode, 
String configKey) throws IOException {
+    
assertTrue(datanode.getReconfigurationHandler().isPropertyReconfigurable(configKey));
+    
assertTrue(datanode.getReconfigurationHandler().listReconfigureProperties().contains(configKey));
+  }
+}
diff --git a/hadoop-ozone/mini-cluster/pom.xml 
b/hadoop-ozone/mini-cluster/pom.xml
index a96cfc23184..055ecc8ee8c 100644
--- a/hadoop-ozone/mini-cluster/pom.xml
+++ b/hadoop-ozone/mini-cluster/pom.xml
@@ -109,6 +109,10 @@
       <groupId>org.apache.ratis</groupId>
       <artifactId>ratis-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ratis</groupId>
+      <artifactId>ratis-server-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git 
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
 
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f1173496583..d3369534617 100644
--- 
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ 
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -117,6 +117,10 @@ public boolean isOMActive(String omNodeId) {
     return omhaService.isServiceActive(omNodeId);
   }
 
+  public boolean isSCMActive(String scmNodeId) {
+    return scmhaService.isServiceActive(scmNodeId);
+  }
+
   public Iterator<StorageContainerManager> getInactiveSCM() {
     return scmhaService.inactiveServices();
   }
@@ -151,6 +155,20 @@ public StorageContainerManager getScmLeader() {
         .findFirst().orElse(null);
   }
 
+  public StorageContainerManager getScmLeader(boolean waitForLeaderElection)
+      throws TimeoutException, InterruptedException {
+    if (waitForLeaderElection) {
+      final StorageContainerManager[] scm = new StorageContainerManager[1];
+      GenericTestUtils.waitFor(() -> {
+        scm[0] = getScmLeader();
+        return scm[0] != null;
+      }, 200, waitForClusterToBeReadyTimeout);
+      return scm[0];
+    } else {
+      return getScmLeader();
+    }
+  }
+
   public OzoneManager waitForLeaderOM()
       throws TimeoutException, InterruptedException {
     final OzoneManager[] om = new OzoneManager[1];
@@ -332,6 +350,20 @@ private static void configureOMPorts(ConfigurationTarget 
conf,
     conf.setInt(omRatisPortKey, getFreePort());
   }
 
+  private void stopAndDeactivate(StorageContainerManager scm) {
+    stopSCM(scm.getSCMNodeId());
+    scmhaService.deactivate(scm);
+  }
+
+  public void stopSCM(String scmNodeId) {
+    stopAndDeactivate(scmhaService.getServiceById(scmNodeId));
+  }
+
+  public void deactivateSCM(String scmNodeId) {
+    StorageContainerManager scm = scmhaService.getServiceById(scmNodeId);
+    scmhaService.deactivate(scm);
+  }
+
   /**
    * Builder for configuring the MiniOzoneCluster to run.
    */
@@ -902,6 +934,177 @@ public void setupExitManagerForTesting() {
     }
   }
 
+  private OzoneConfiguration addNewSCMToConfig(String scmServiceId, String 
scmNodeId) {
+    OzoneConfiguration newConf = new OzoneConfiguration(getConf());
+    StringBuilder scmNames = new StringBuilder();
+    scmNames.append(newConf.get(ScmConfigKeys.OZONE_SCM_NAMES));
+    configureSCMPorts(newConf, scmServiceId, scmNodeId, scmNames);
+
+    String scmNodesKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+    newConf.set(scmNodesKey, newConf.get(scmNodesKey) + "," + scmNodeId);
+    newConf.set(ScmConfigKeys.OZONE_SCM_NAMES, scmNames.toString());
+
+    return newConf;
+  }
+
+  private static void configureSCMPorts(ConfigurationTarget conf, String 
scmServiceId, String scmNodeId,
+      StringBuilder scmNames) {
+    String scmAddrKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+    String scmHttpAddrKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, scmServiceId, scmNodeId);
+    String scmHttpsAddrKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, scmServiceId, scmNodeId);
+    String scmRatisPortKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, scmServiceId, scmNodeId);
+    String dnPortKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+        scmServiceId, scmNodeId);
+    String blockClientKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+        scmServiceId, scmNodeId);
+    String ssClientKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY,
+        scmServiceId, scmNodeId);
+    String scmGrpcPortKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, scmServiceId, scmNodeId);
+    String scmSecurityAddrKey = ConfUtils.addKeySuffixes(
+        ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY, scmServiceId,
+        scmNodeId);
+
+    conf.set(scmAddrKey, "127.0.0.1");
+    conf.set(scmHttpAddrKey, localhostWithFreePort());
+    conf.set(scmHttpsAddrKey, localhostWithFreePort());
+    conf.set(scmSecurityAddrKey, localhostWithFreePort());
+    conf.set("ozone.scm.update.service.port", "0");
+
+    int ratisPort = getFreePort();
+    conf.setInt(scmRatisPortKey, ratisPort);
+    //conf.setInt("ozone.scm.ha.ratis.bind.port", ratisPort);
+
+    int dnPort = getFreePort();
+    conf.set(dnPortKey, "127.0.0.1:" + dnPort);
+    scmNames.append(",localhost:").append(dnPort);
+
+    conf.set(ssClientKey, localhostWithFreePort());
+    conf.setInt(scmGrpcPortKey, getFreePort());
+
+    String blockAddress = localhostWithFreePort();
+    conf.set(blockClientKey, blockAddress);
+    conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+        blockAddress);
+  }
+
+  /**
+   * Update the configurations of the given list of SCMs on an SCM HA Service.
+   */
+  public void updateSCMConfigs(OzoneConfiguration newConf) {
+    for (StorageContainerManager scm : scmhaService.getActiveServices()) {
+      scm.setConfiguration(newConf);
+    }
+  }
+
+  public void bootstrapSCM(String scmNodeId, boolean updateConfigs) throws 
Exception {
+    int retryCount = 0;
+    StorageContainerManager scm = null;
+
+    StorageContainerManager scmLeader = getScmLeader(true);
+    long leaderSnapshotIndex = 
scmLeader.getScmHAManager().getRatisServer().getSCMStateMachine()
+        .getLatestSnapshot().getIndex();
+
+    while (true) {
+      try {
+        OzoneConfiguration newConf = 
addNewSCMToConfig(scmhaService.getServiceId(), scmNodeId);
+
+        if (updateConfigs) {
+          updateSCMConfigs(newConf);
+        }
+
+        scm = bootstrapNewSCM(scmNodeId, newConf);
+
+        LOG.info("Bootstrapped SCM {} RPC server at {}", scmNodeId,
+            scm.getClientRpcAddress());
+
+        // Add new SCMs to cluster's in memory map and update existing SCMs 
conf.
+        setConf(newConf);
+
+        break;
+      } catch (IOException e) {
+        // Existing SCM config could have been updated with new conf. Reset it.
+        for (StorageContainerManager existingSCM : scmhaService.getServices()) 
{
+          existingSCM.setConfiguration(getConf());
+        }
+        if (e instanceof BindException ||
+            e.getCause() instanceof BindException) {
+          ++retryCount;
+          LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+              retryCount, e);
+        } else {
+          throw e;
+        }
+      }
+
+      waitForBootstrappedNodeToBeReady(scm, leaderSnapshotIndex);
+      if (updateConfigs) {
+        waitForConfigUpdateOnActiveSCMs(scmNodeId);
+      }
+    }
+  }
+
+  /**
+   * Start a new SCM in Bootstrap mode. Configs (address and ports) for the new
+   * SCM must already be set in the newConf.
+   */
+  private StorageContainerManager bootstrapNewSCM(String nodeId,
+      OzoneConfiguration newConf) throws Exception {
+    OzoneConfiguration config = new OzoneConfiguration(newConf);
+
+    // For bootstrapping node, set the nodeId config also.
+    config.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, nodeId);
+
+    // Set metadata/DB dir base path
+    String metaDirPath = clusterMetaPath + "/" + nodeId;
+    config.set(OZONE_METADATA_DIRS, metaDirPath);
+
+    StorageContainerManager.scmBootstrap(config);
+    StorageContainerManager scm = StorageContainerManager.createSCM(config);
+
+    scmhaService.addInstance(scm, false);
+    startInactiveSCM(nodeId);
+
+    return scm;
+  }
+
+  private void waitForBootstrappedNodeToBeReady(StorageContainerManager newSCM,
+      long leaderSnapshotIndex) throws Exception {
+    // Wait for bootstrapped nodes to catch up with others
+    GenericTestUtils.waitFor(() -> {
+      return newSCM.getScmHAManager().getRatisServer().getSCMStateMachine()
+          .getLatestSnapshot().getIndex() >= leaderSnapshotIndex;
+    }, 1000, waitForClusterToBeReadyTimeout);
+  }
+
+  private void waitForConfigUpdateOnActiveSCMs(
+      String newSCMNodeId) throws Exception {
+    StorageContainerManager newSCMNode = scmhaService
+        .getServiceById(newSCMNodeId);
+    GenericTestUtils.waitFor(() -> {
+      // Each existing active SCM should contain the new SCM in its peerList.
+      // Also, the new SCM should contain each existing active SCM in it's SCM
+      // peer list and RatisServer peerList.
+      for (StorageContainerManager scm : scmhaService.getActiveServices()) {
+        if (!scm.doesPeerExist(scm.getScmId())) {
+          return false;
+        }
+        if (!newSCMNode.doesPeerExist(scm.getSCMNodeId())) {
+          return false;
+        }
+      }
+      return true;
+    }, 1000, waitForClusterToBeReadyTimeout);
+  }
+
   /**
    * MiniOzoneHAService is a helper class used for both SCM and OM HA.
    * This class keeps track of active and inactive OM/SCM services


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to