This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new 7a26250  HDDS-4342. Add DataNode state and transitions for a node 
going through upgrade. (#1508)
7a26250 is described below

commit 7a262504dcbe701773624cc12bf4fef06212ec53
Author: prashantpogde <[email protected]>
AuthorDate: Sat Nov 14 15:20:37 2020 -0800

    HDDS-4342. Add DataNode state and transitions for a node going through 
upgrade. (#1508)
---
 .../protocol/StorageContainerNodeProtocol.java     |   4 +-
 .../interface-client/src/main/proto/hdds.proto     |   3 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  15 +-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |  46 ++++-
 .../hadoop/hdds/scm/node/NodeStateManager.java     | 200 +++++++++++++++++----
 ...=> NonHealthyToReadOnlyHealthyNodeHandler.java} |  28 ++-
 ...va => ReadOnlyHealthyToHealthyNodeHandler.java} |  11 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  24 ++-
 .../hadoop/hdds/scm/node/SCMNodeMetrics.java       |   5 +
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   8 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   3 +-
 .../hdds/scm/server/StorageContainerManager.java   |  17 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |  10 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 145 +++++++++++----
 .../hadoop/hdds/scm/node/TestStatisticsUpdate.java |  17 +-
 .../testutils/ReplicationNodeManagerMock.java      |   4 +-
 .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java  |  32 +++-
 .../hadoop/ozone/TestStorageContainerManager.java  |  13 +-
 .../ozone/recon/api/ClusterStateEndpoint.java      |   4 +-
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |   8 +-
 21 files changed, 488 insertions(+), 113 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 3375773..e9071d1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -67,9 +67,11 @@ public interface StorageContainerNodeProtocol {
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.
    * @param datanodeDetails - Datanode ID.
+   * @param layoutVersionInfo - Layout Version Proto.
    * @return SCMheartbeat response list
    */
-  List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
+  List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+                                    LayoutVersionProto layoutVersionInfo);
 
   /**
    * Check if node is registered or not.
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index f56cf2e..d8306fa 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -131,8 +131,7 @@ enum NodeState {
     DEAD = 3;
     DECOMMISSIONING = 4;
     DECOMMISSIONED = 5;
-    HEALTHY_READ_ONLY = 6;
-
+    HEALTHY_READONLY = 6;
 }
 
 enum QueryScope {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 8b40571..d7caffe 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -165,8 +165,19 @@ public final class SCMEvents {
    * This event will be triggered whenever a datanode is moved from non-healthy
    * state to healthy state.
    */
-  public static final TypedEvent<DatanodeDetails> NON_HEALTHY_TO_HEALTHY_NODE =
-      new TypedEvent<>(DatanodeDetails.class, "NON_HEALTHY_TO_HEALTHY_NODE");
+  public static final TypedEvent<DatanodeDetails>
+      READ_ONLY_HEALTHY_TO_HEALTHY_NODE =
+      new TypedEvent<>(DatanodeDetails.class,
+          "READ_ONLY_HEALTHY_TO_HEALTHY_NODE");
+
+  /**
+   * This event will be triggered whenever a datanode is moved from non-healthy
+   * state to readonly-healthy state.
+   */
+  public static final TypedEvent<DatanodeDetails>
+      NON_HEALTHY_TO_READONLY_HEALTHY_NODE =
+      new TypedEvent<>(DatanodeDetails.class,
+          "NON_HEALTHY_TO_READONLY_HEALTHY_NODE");
 
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 2e7bdeb..92ae43b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
@@ -43,16 +45,26 @@ public class DatanodeInfo extends DatanodeDetails {
 
   private List<StorageReportProto> storageReports;
   private List<MetadataStorageReportProto> metadataStorageReports;
+  private LayoutVersionProto lastKnownLayoutVersion;
 
   /**
    * Constructs DatanodeInfo from DatanodeDetails.
    *
    * @param datanodeDetails Details about the datanode
+   * @param layoutInfo Details about the LayoutVersionProto
    */
-  public DatanodeInfo(DatanodeDetails datanodeDetails) {
+  public DatanodeInfo(DatanodeDetails datanodeDetails,
+                      LayoutVersionProto layoutInfo) {
     super(datanodeDetails);
     this.lock = new ReentrantReadWriteLock();
     this.lastHeartbeatTime = Time.monotonicNow();
+    lastKnownLayoutVersion =
+        LayoutVersionProto.newBuilder()
+            .setMetadataLayoutVersion(layoutInfo != null ?
+                layoutInfo.getMetadataLayoutVersion() : 0)
+            .setSoftwareLayoutVersion(layoutInfo != null ?
+                layoutInfo.getSoftwareLayoutVersion() : 0)
+            .build();
     this.storageReports = Collections.emptyList();
     this.metadataStorageReports = Collections.emptyList();
   }
@@ -70,6 +82,24 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Updates the last LayoutVersion.
+   */
+  public void updateLastKnownLayoutVersion(LayoutVersionProto version) {
+    if (version == null) {
+      return;
+    }
+    try {
+      lock.writeLock().lock();
+      lastKnownLayoutVersion = LayoutVersionProto.newBuilder()
+          .setMetadataLayoutVersion(version.getMetadataLayoutVersion())
+          .setSoftwareLayoutVersion(version.getSoftwareLayoutVersion())
+          .build();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Returns the last heartbeat time.
    *
    * @return last heartbeat time.
@@ -84,6 +114,20 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Returns the last known Layout Version .
+   *
+   * @return last  Layout Version.
+   */
+  public LayoutVersionProto getLastKnownLayoutVersion() {
+    try {
+      lock.readLock().lock();
+      return lastKnownLayoutVersion;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
    * Updates the datanode storage reports.
    *
    * @param reports list of storage report
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index d51961f..9cc403f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -34,6 +34,8 @@ import java.util.function.Predicate;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
@@ -79,7 +82,8 @@ public class NodeStateManager implements Runnable, Closeable {
    * Node's life cycle events.
    */
   private enum NodeLifeCycleEvent {
-    TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED
+    TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED, LAYOUT_MISMATCH,
+    LAYOUT_MATCH
   }
 
   private static final Logger LOG = LoggerFactory
@@ -143,13 +147,19 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   private long skippedHealthChecks;
 
+  private LayoutVersionManager layoutVersionManager;
+
   /**
    * Constructs a NodeStateManager instance with the given configuration.
    *
    * @param conf Configuration
+   * @param eventPublisher event publisher
+   * @param layoutManager Layout version manager
    */
   public NodeStateManager(ConfigurationSource conf,
-      EventPublisher eventPublisher) {
+                          EventPublisher eventPublisher,
+                          LayoutVersionManager layoutManager) {
+    this.layoutVersionManager = layoutManager;
     this.nodeStateMap = new NodeStateMap();
     this.node2PipelineMap = new Node2PipelineMap();
     this.eventPublisher = eventPublisher;
@@ -157,7 +167,9 @@ public class NodeStateManager implements Runnable, 
Closeable {
     initialiseState2EventMap();
     Set<NodeState> finalStates = new HashSet<>();
     finalStates.add(NodeState.DECOMMISSIONED);
-    this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates);
+    // All DataNodes should start in HealthyReadOnly state.
+    this.stateMachine = new StateMachine<>(NodeState.HEALTHY_READONLY,
+        finalStates);
     initializeStateMachine();
     heartbeatCheckerIntervalMs = HddsServerUtil
         .getScmheartbeatCheckerInterval(conf);
@@ -185,27 +197,42 @@ public class NodeStateManager implements Runnable, 
Closeable {
     state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
     state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
     state2EventMap
-        .put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
+        .put(NodeState.HEALTHY, SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE);
+    state2EventMap
+        .put(NodeState.HEALTHY_READONLY,
+            SCMEvents.NON_HEALTHY_TO_READONLY_HEALTHY_NODE);
   }
 
   /*
    *
    * Node and State Transition Mapping:
    *
-   * State: HEALTHY         -------------------> STALE
-   * Event:                       TIMEOUT
+   * State: HEALTHY             -------------------> STALE
+   * Event:                          TIMEOUT
    *
-   * State: STALE           -------------------> DEAD
-   * Event:                       TIMEOUT
+   * State: HEALTHY             -------------------> DECOMMISSIONING
+   * Event:                        DECOMMISSION
+   *
+   * State: HEALTHY             -------------------> HEALTHY_READONLY
+   * Event:                       LAYOUT_MISMATCH
+   *
+   * State: HEALTHY_READONLY    -------------------> HEALTHY
+   * Event:                       LAYOUT_MATCH
    *
-   * State: STALE           -------------------> HEALTHY
+   * State: HEALTHY_READONLY    -------------------> DECOMMISSIONING
+   * Event:                        DECOMMISSION
+   *
+   * State: HEALTHY_READONLY    -------------------> STALE
+   * Event:                          TIMEOUT
+   *
+   * State: STALE           -------------------> HEALTHY_READONLY
    * Event:                       RESTORE
    *
-   * State: DEAD            -------------------> HEALTHY
+   * State: DEAD            -------------------> HEALTHY_READONLY
    * Event:                       RESURRECT
    *
-   * State: HEALTHY         -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
+   * State: STALE           -------------------> DEAD
+   * Event:                       TIMEOUT
    *
    * State: STALE           -------------------> DECOMMISSIONING
    * Event:                     DECOMMISSION
@@ -218,25 +245,41 @@ public class NodeStateManager implements Runnable, 
Closeable {
    *
    *  Node State Flow
    *
-   *  +--------------------------------------------------------+
-   *  |                                     (RESURRECT)        |
-   *  |   +--------------------------+                         |
-   *  |   |      (RESTORE)           |                         |
-   *  |   |                          |                         |
-   *  V   V                          |                         |
-   * [HEALTHY]------------------->[STALE]------------------->[DEAD]
-   *    |         (TIMEOUT)          |         (TIMEOUT)       |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    | (DECOMMISSION)             | (DECOMMISSION)          | (DECOMMISSION)
-   *    |                            V                         |
-   *    +------------------->[DECOMMISSIONING]<----------------+
-   *                                 |
-   *                                 | (DECOMMISSIONED)
-   *                                 |
-   *                                 V
+   *                                      +->------------------->------+
+   *                                      |                            |
+   *                                      |(DECOMMISSION)              |
+   *                                      ^                            V
+   *                                      |  +-----<---------<---+     |
+   *                                      |  |    (RESURRECT)    |     |
+   *    +-->-----(LAYOUT_MISMATCH)-->--+  |  V                   |     |
+   *    |                              |  |  |                   ^     |
+   *    |                              |  ^  |                   |     |
+   *    |                              V  |  V                   |     |
+   *    |  +-----(LAYOUT_MATCH)--[HEALTHY_READONLY]              |     |
+   *    |  |                            ^  |                     |     V
+   *    |  |                            |  |                     ^     |
+   *    |  |                            |  |(TIMEOUT)            |     |
+   *    ^  |                  (RESTORE) |  |                     |     |
+   *    |  V                            |  V                     |     |
+   * [HEALTHY]---->----------------->[STALE]------->--------->[DEAD]   |
+   *    |           (TIMEOUT)         |         (TIMEOUT)       |      |
+   *    |                             |                         |      |
+   *    V                             |                         V      |
+   *    |                             |                         |      V
+   *    |                             |                         |      |
+   *    |                             |                         |      |
+   *    |(DECOMMISSION)               | (DECOMMISSION)          |(DECOMMISSION)
+   *    |                             V                         |      |
+   *    +---->---------------->[DECOMMISSIONING]<---------------+      |
+   *                                   |   ^                           |
+   *                                   |   |                           V
+   *                                   V   |                           |
+   *                                   |   +-----------<----------<----+
+   *                                   |
+   *                                   |
+   *                                   | (DECOMMISSIONED)
+   *                                   |
+   *                                   V
    *                          [DECOMMISSIONED]
    *
    */
@@ -246,13 +289,27 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   private void initializeStateMachine() {
     stateMachine.addTransition(
+        NodeState.HEALTHY_READONLY, NodeState.HEALTHY,
+        NodeLifeCycleEvent.LAYOUT_MATCH);
+    stateMachine.addTransition(
+        NodeState.HEALTHY_READONLY, NodeState.STALE,
+        NodeLifeCycleEvent.TIMEOUT);
+    stateMachine.addTransition(
+        NodeState.HEALTHY_READONLY, NodeState.DECOMMISSIONING,
+        NodeLifeCycleEvent.DECOMMISSION);
+    stateMachine.addTransition(
         NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
     stateMachine.addTransition(
+        NodeState.HEALTHY, NodeState.HEALTHY_READONLY,
+        NodeLifeCycleEvent.LAYOUT_MISMATCH);
+    stateMachine.addTransition(
         NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
     stateMachine.addTransition(
-        NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
+        NodeState.STALE, NodeState.HEALTHY_READONLY,
+        NodeLifeCycleEvent.RESTORE);
     stateMachine.addTransition(
-        NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);
+        NodeState.DEAD, NodeState.HEALTHY_READONLY,
+        NodeLifeCycleEvent.RESURRECT);
     stateMachine.addTransition(
         NodeState.HEALTHY, NodeState.DECOMMISSIONING,
         NodeLifeCycleEvent.DECOMMISSION);
@@ -272,12 +329,15 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * Adds a new node to the state manager.
    *
    * @param datanodeDetails DatanodeDetails
+   * @param layoutInfo LayoutVersionProto
    *
    * @throws NodeAlreadyExistsException if the node is already present
    */
-  public void addNode(DatanodeDetails datanodeDetails)
+  public void addNode(DatanodeDetails datanodeDetails,
+                      LayoutVersionProto layoutInfo)
       throws NodeAlreadyExistsException {
-    nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState());
+    nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState(),
+        layoutInfo);
     eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
   }
 
@@ -324,6 +384,20 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
+   * Updates the last known layout version of the node.
+   * @param datanodeDetails DataNode Details
+   * @param layoutInfo DataNode Layout Information
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public void updateLastKnownLayoutVersion(DatanodeDetails datanodeDetails,
+                                      LayoutVersionProto layoutInfo)
+      throws NodeNotFoundException {
+    nodeStateMap.getNodeInfo(datanodeDetails.getUuid())
+        .updateLastKnownLayoutVersion(layoutInfo);
+  }
+
+  /**
    * Returns the current state of the node.
    *
    * @param datanodeDetails DatanodeDetails
@@ -343,7 +417,10 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return list of healthy nodes
    */
   public List<DatanodeInfo> getHealthyNodes() {
-    return getNodes(NodeState.HEALTHY);
+    List<DatanodeInfo> allHealthyNodes;
+    allHealthyNodes = getNodes(NodeState.HEALTHY);
+    allHealthyNodes.addAll(getNodes(NodeState.HEALTHY_READONLY));
+    return allHealthyNodes;
   }
 
   /**
@@ -423,7 +500,8 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return healthy node count
    */
   public int getHealthyNodeCount() {
-    return getNodeCount(NodeState.HEALTHY);
+    return getNodeCount(NodeState.HEALTHY) +
+        getNodeCount(NodeState.HEALTHY_READONLY);
   }
 
   /**
@@ -592,6 +670,12 @@ public class NodeStateManager implements Runnable, 
Closeable {
         (lastHbTime) -> lastHbTime < healthyNodeDeadline;
     Predicate<Long> deadNodeCondition =
         (lastHbTime) -> lastHbTime < staleNodeDeadline;
+    Predicate<LayoutVersionProto> layoutMatchCondition =
+        (layout) -> layout.getMetadataLayoutVersion() ==
+            layoutVersionManager.getMetadataLayoutVersion();
+    Predicate<LayoutVersionProto> layoutMisMatchCondition =
+        (layout) -> layout.getMetadataLayoutVersion() !=
+            layoutVersionManager.getMetadataLayoutVersion();
     try {
       for (NodeState state : NodeState.values()) {
         List<UUID> nodes = nodeStateMap.getNodes(state);
@@ -599,11 +683,21 @@ public class NodeStateManager implements Runnable, 
Closeable {
           DatanodeInfo node = nodeStateMap.getNodeInfo(id);
           switch (state) {
           case HEALTHY:
-            // Move the node to STALE if the last heartbeat time is less than
+              // Move the node to STALE if the last heartbeat time is less than
             // configured stale-node interval.
+            updateNodeLayoutVersionState(node, layoutMisMatchCondition, state,
+                NodeLifeCycleEvent.LAYOUT_MISMATCH);
             updateNodeState(node, staleNodeCondition, state,
                 NodeLifeCycleEvent.TIMEOUT);
             break;
+          case HEALTHY_READONLY:
+            // Move the node to STALE if the last heartbeat time is less than
+            // configured stale-node interval.
+            updateNodeLayoutVersionState(node, layoutMatchCondition, state,
+                NodeLifeCycleEvent.LAYOUT_MATCH);
+            updateNodeState(node, staleNodeCondition, state,
+                  NodeLifeCycleEvent.TIMEOUT);
+            break;
           case STALE:
             // Move the node to DEAD if the last heartbeat time is less than
             // configured dead-node interval.
@@ -708,6 +802,36 @@ public class NodeStateManager implements Runnable, 
Closeable {
     }
   }
 
+  /**
+   * Updates the node state if the condition satisfies.
+   *
+   * @param node DatanodeInfo
+   * @param condition condition to check
+   * @param state current state of node
+   * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition
+   *                       matches
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  private void updateNodeLayoutVersionState(DatanodeInfo node,
+                             Predicate<LayoutVersionProto> condition,
+                             NodeState state, NodeLifeCycleEvent 
lifeCycleEvent)
+      throws NodeNotFoundException {
+    try {
+      if (condition.test(node.getLastKnownLayoutVersion())) {
+        NodeState newState = stateMachine.getNextState(state, lifeCycleEvent);
+        nodeStateMap.updateNodeState(node.getUuid(), state, newState);
+        if (state2EventMap.containsKey(newState)) {
+          eventPublisher.fireEvent(state2EventMap.get(newState), node);
+        }
+      }
+    } catch (InvalidStateTransitionException e) {
+      LOG.warn("Invalid state transition of node {}." +
+              " Current state: {}, life cycle event: {}",
+          node, state, lifeCycleEvent);
+    }
+  }
+
   @Override
   public void close() {
     executorService.shutdown();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToReadOnlyHealthyNodeHandler.java
similarity index 62%
copy from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
copy to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToReadOnlyHealthyNodeHandler.java
index cc32f84..7fd36c7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToReadOnlyHealthyNodeHandler.java
@@ -17,31 +17,49 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
+import java.util.Set;
+
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Handles Stale node event.
+ * Handles non healthy to healthy(ReadOnly) node event.
  */
-public class NonHealthyToHealthyNodeHandler
+public class NonHealthyToReadOnlyHealthyNodeHandler
     implements EventHandler<DatanodeDetails> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NonHealthyToReadOnlyHealthyNodeHandler.class);
   private final PipelineManager pipelineManager;
+  private final NodeManager nodeManager;
   private final ConfigurationSource conf;
 
-  public NonHealthyToHealthyNodeHandler(
-      PipelineManager pipelineManager, OzoneConfiguration conf) {
+  public NonHealthyToReadOnlyHealthyNodeHandler(
+      NodeManager nodeManager, PipelineManager pipelineManager,
+      OzoneConfiguration conf) {
     this.pipelineManager = pipelineManager;
+    this.nodeManager = nodeManager;
     this.conf = conf;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    pipelineManager.triggerPipelineCreation();
+    Set<PipelineID> pipelineIds =
+        nodeManager.getPipelines(datanodeDetails);
+    LOG.info("Datanode {} moved to HEALTH READ ONLY state.",
+        datanodeDetails);
+    if (!pipelineIds.isEmpty()) {
+      LOG.error("Datanode {} is part of pipelines {} in HEALTH READ ONLY " +
+              "state.",
+          datanodeDetails, pipelineIds);
+    }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/ReadOnlyHealthyToHealthyNodeHandler.java
similarity index 82%
rename from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
rename to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/ReadOnlyHealthyToHealthyNodeHandler.java
index cc32f84..ea79bf6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/ReadOnlyHealthyToHealthyNodeHandler.java
@@ -23,17 +23,22 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles Stale node event.
  */
-public class NonHealthyToHealthyNodeHandler
+public class ReadOnlyHealthyToHealthyNodeHandler
     implements EventHandler<DatanodeDetails> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NonHealthyToReadOnlyHealthyNodeHandler.class);
+
   private final PipelineManager pipelineManager;
   private final ConfigurationSource conf;
 
-  public NonHealthyToHealthyNodeHandler(
+  public ReadOnlyHealthyToHealthyNodeHandler(
       PipelineManager pipelineManager, OzoneConfiguration conf) {
     this.pipelineManager = pipelineManager;
     this.conf = conf;
@@ -42,6 +47,8 @@ public class NonHealthyToHealthyNodeHandler
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
+    LOG.info("Datanode {} moved to HEALTHY state.",
+        datanodeDetails);
     pipelineManager.triggerPipelineCreation();
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 793ba40..86ef122 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -123,7 +123,8 @@ public class SCMNodeManager implements NodeManager {
                         NetworkTopology networkTopology,
                         HDDSLayoutVersionManager layoutVersionManager) {
     this.scmNodeEventPublisher = eventPublisher;
-    this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
+    this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
+        layoutVersionManager);
     this.version = VersionInfo.getLatestVersion();
     this.commandQueue = new CommandQueue();
     this.scmStorageConfig = scmStorageConfig;
@@ -294,7 +295,7 @@ public class SCMNodeManager implements NodeManager {
         }
 
         clusterMap.add(datanodeDetails);
-        nodeStateManager.addNode(datanodeDetails);
+        nodeStateManager.addNode(datanodeDetails, layoutInfo);
         // Check that datanode in nodeStateManager has topology parent set
         DatanodeDetails dn = nodeStateManager.getNode(datanodeDetails);
         Preconditions.checkState(dn.getParent() != null);
@@ -344,14 +345,18 @@ public class SCMNodeManager implements NodeManager {
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeDetails - DatanodeDetailsProto.
+   * @param layoutInfo - Layout Version Proto.
    * @return SCMheartbeat response.
    */
   @Override
-  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+                                           LayoutVersionProto layoutInfo) {
     Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
         "DatanodeDetails.");
     try {
       nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
+      nodeStateManager.updateLastKnownLayoutVersion(datanodeDetails,
+          layoutInfo);
       metrics.incNumHBProcessed();
     } catch (NodeNotFoundException e) {
       metrics.incNumHBProcessingFailed();
@@ -445,12 +450,7 @@ public class SCMNodeManager implements NodeManager {
                 "MetadataLayoutVersion = {}",
             datanodeDetails.getHostName(), dnMlv, scmMlv);
 
-        // TBD: Add NEED_UPGRADE state and fill out state transitions
-        // around this state. Fire event to move this data node to
-        // NEED_UPGRADE state. The DataNode will be considered HEALTHY in
-        // this state but it can not be made part of any Pipeline.
-
-        // Also send Finalize command to the data node. Its OK to
+        // Send Finalize command to the data node. Its OK to
         // send Finalize command multiple times.
         scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
             new CommandForDatanode<>(datanodeDetails.getUuid(),
@@ -493,9 +493,12 @@ public class SCMNodeManager implements NodeManager {
 
     final List<DatanodeInfo> healthyNodes = nodeStateManager
         .getNodes(NodeState.HEALTHY);
+    final List<DatanodeInfo> healthyReadOnlyNodes = nodeStateManager
+        .getNodes(NodeState.HEALTHY_READONLY);
     final List<DatanodeInfo> staleNodes = nodeStateManager
         .getNodes(NodeState.STALE);
     final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
+    datanodes.addAll(healthyReadOnlyNodes);
     datanodes.addAll(staleNodes);
 
     for (DatanodeInfo dnInfo : datanodes) {
@@ -566,10 +569,13 @@ public class SCMNodeManager implements NodeManager {
 
     List<DatanodeInfo> healthyNodes = nodeStateManager
         .getNodes(NodeState.HEALTHY);
+    List<DatanodeInfo> healthyReadOnlyNodes = nodeStateManager
+        .getNodes(NodeState.HEALTHY_READONLY);
     List<DatanodeInfo> staleNodes = nodeStateManager
         .getNodes(NodeState.STALE);
 
     List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
+    datanodes.addAll(healthyReadOnlyNodes);
     datanodes.addAll(staleNodes);
 
     for (DatanodeInfo dnInfo : datanodes) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
index c515f2f..111c546 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
@@ -37,6 +37,7 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 
 /**
@@ -126,6 +127,10 @@ public final class SCMNodeMetrics implements MetricsSource 
{
                 "HealthyNodes",
                 "Number of healthy datanodes"),
                 nodeCount.get(HEALTHY.toString()))
+            .addGauge(Interns.info(
+                "HealthyReadOnlyNodes",
+                "Number of healthy and read only datanodes"),
+                nodeCount.get(HEALTHY_READONLY.toString()))
             .addGauge(Interns.info("StaleNodes",
                 "Number of stale datanodes"),
                 nodeCount.get(STALE.toString()))
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index baebef5..3494b03 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 
@@ -76,10 +78,12 @@ public class NodeStateMap {
    *
    * @param datanodeDetails DatanodeDetails
    * @param nodeState initial NodeState
+   * @param layoutInfo initial LayoutVersionProto
    *
    * @throws NodeAlreadyExistsException if the node already exist
    */
-  public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState)
+  public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState,
+                      LayoutVersionProto layoutInfo)
       throws NodeAlreadyExistsException {
     lock.writeLock().lock();
     try {
@@ -87,7 +91,7 @@ public class NodeStateMap {
       if (nodeMap.containsKey(id)) {
         throw new NodeAlreadyExistsException("Node UUID: " + id);
       }
-      nodeMap.put(id, new DatanodeInfo(datanodeDetails));
+      nodeMap.put(id, new DatanodeInfo(datanodeDetails, layoutInfo));
       nodeToContainer.put(id, ConcurrentHashMap.newKeySet());
       stateMap.get(nodeState).add(id);
     } finally {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 8447b19..89625a0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -109,7 +109,8 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       // should we dispatch heartbeat through eventPublisher?
-      commands = nodeManager.processHeartbeat(datanodeDetails);
+      commands = nodeManager.processHeartbeat(datanodeDetails,
+          heartbeat.getDataNodeLayoutVersion());
       if (heartbeat.hasNodeReport()) {
         LOG.debug("Dispatching Node Report.");
         eventPublisher.fireEvent(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 91e8cb9..392ef1a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -76,7 +76,8 @@ import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
-import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
+import org.apache.hadoop.hdds.scm.node.NonHealthyToReadOnlyHealthyNodeHandler;
+import org.apache.hadoop.hdds.scm.node.ReadOnlyHealthyToHealthyNodeHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
@@ -304,8 +305,12 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
         pipelineManager, containerManager);
-    NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
-        new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
+    ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler =
+        new ReadOnlyHealthyToHealthyNodeHandler(pipelineManager, conf);
+    NonHealthyToReadOnlyHealthyNodeHandler
+        nonHealthyToReadOnlyHealthyNodeHandler =
+        new NonHealthyToReadOnlyHealthyNodeHandler(scmNodeManager,
+            pipelineManager, conf);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
     PendingDeleteHandler pendingDeleteHandler =
         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
@@ -351,8 +356,10 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
     eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
     eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
-    eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
-        nonHealthyToHealthyNodeHandler);
+    eventQueue.addHandler(SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE,
+        readOnlyHealthyToHealthyNodeHandler);
+    eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_READONLY_HEALTHY_NODE,
+        nonHealthyToReadOnlyHealthyNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
     eventQueue
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index a3c09d8..96cd832 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -500,10 +500,12 @@ public class MockNodeManager implements NodeManager {
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeDetails - Datanode ID.
+   * @param layoutInfo - DataNode Layout info
    * @return SCMheartbeat response list
    */
   @Override
-  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+                                           LayoutVersionProto layoutInfo) {
     return null;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index a7f6466..453609a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.test.PathUtils;
 
 import org.apache.commons.io.IOUtils;
@@ -162,9 +165,14 @@ public class TestContainerPlacement {
     List<DatanodeDetails> datanodes =
         TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
     XceiverClientManager xceiverClientManager = null;
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
     try {
       for (DatanodeDetails datanodeDetails : datanodes) {
-        nodeManager.processHeartbeat(datanodeDetails);
+        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
       }
 
       //TODO: wait for heartbeat to be processed
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 9be6cc9..9f5a988 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -69,6 +70,7 @@ import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanode
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
@@ -170,12 +172,18 @@ public class TestSCMNodeManager {
       throws IOException, InterruptedException, AuthenticationException {
 
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .build();
       int registeredNodes = 5;
       // Send some heartbeats from different nodes.
       for (int x = 0; x < registeredNodes; x++) {
         DatanodeDetails datanodeDetails = TestUtils
             .createRandomDatanodeAndRegister(nodeManager);
-        nodeManager.processHeartbeat(datanodeDetails);
+        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
       }
 
       //TODO: wait for heartbeat to be processed
@@ -255,10 +263,15 @@ public class TestSCMNodeManager {
     SCMNodeManager nodeManager = createNodeManager(conf);
     DatanodeDetails datanodeDetails = TestUtils
         .createRandomDatanodeAndRegister(nodeManager);
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
     nodeManager.close();
 
     // These should never be processed.
-    nodeManager.processHeartbeat(datanodeDetails);
+    nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
 
     // Let us just wait for 2 seconds to prove that HBs are not processed.
     Thread.sleep(2 * 1000);
@@ -281,11 +294,17 @@ public class TestSCMNodeManager {
     final int count = 10;
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
 
       for (int x = 0; x < count; x++) {
         DatanodeDetails datanodeDetails = TestUtils
             .createRandomDatanodeAndRegister(nodeManager);
-        nodeManager.processHeartbeat(datanodeDetails);
+        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
       }
       //TODO: wait for heartbeat to be processed
       Thread.sleep(4 * 1000);
@@ -339,6 +358,12 @@ public class TestSCMNodeManager {
 
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
       List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount);
 
 
@@ -346,18 +371,18 @@ public class TestSCMNodeManager {
           nodeManager);
 
       // Heartbeat once
-      nodeManager.processHeartbeat(staleNode);
+      nodeManager.processHeartbeat(staleNode, layoutInfo);
 
       // Heartbeat all other nodes.
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.processHeartbeat(dn);
+        nodeManager.processHeartbeat(dn, layoutInfo);
       }
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
 
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.processHeartbeat(dn);
+        nodeManager.processHeartbeat(dn, layoutInfo);
       }
 
       // Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@@ -374,7 +399,7 @@ public class TestSCMNodeManager {
 
       // heartbeat good nodes again.
       for (DatanodeDetails dn : nodeList) {
-        nodeManager.processHeartbeat(dn);
+        nodeManager.processHeartbeat(dn, layoutInfo);
       }
 
       //  6 seconds is the dead window for this test , so we wait a total of
@@ -426,13 +451,19 @@ public class TestSCMNodeManager {
         deadNodeInterval, SECONDS);
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
       DatanodeDetails node1 =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
       DatanodeDetails node2 =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
 
-      nodeManager.processHeartbeat(node1);
-      nodeManager.processHeartbeat(node2);
+      nodeManager.processHeartbeat(node1, layoutInfo);
+      nodeManager.processHeartbeat(node2, layoutInfo);
 
       // Sleep so that heartbeat processing thread gets to run.
       Thread.sleep(1000);
@@ -478,7 +509,7 @@ public class TestSCMNodeManager {
       assertEquals(2, nodeManager.getNodeCount(HEALTHY));
 
       // Step 5 : heartbeat for node1
-      nodeManager.processHeartbeat(node1);
+      nodeManager.processHeartbeat(node1, layoutInfo);
 
       // Step 6 : wait for health check process to run
       Thread.sleep(1000);
@@ -558,7 +589,7 @@ public class TestSCMNodeManager {
   public void testScmCheckForErrorOnNullDatanodeDetails()
       throws IOException, AuthenticationException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
-      nodeManager.processHeartbeat(null);
+      nodeManager.processHeartbeat(null, null);
     } catch (NullPointerException npe) {
       GenericTestUtils.assertExceptionContains("Heartbeat is missing " +
           "DatanodeDetails.", npe);
@@ -629,15 +660,21 @@ public class TestSCMNodeManager {
      * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
      */
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
       DatanodeDetails healthyNode =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
       DatanodeDetails staleNode =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
       DatanodeDetails deadNode =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
-      nodeManager.processHeartbeat(healthyNode);
-      nodeManager.processHeartbeat(staleNode);
-      nodeManager.processHeartbeat(deadNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
+      nodeManager.processHeartbeat(staleNode, layoutInfo);
+      nodeManager.processHeartbeat(deadNode, layoutInfo);
 
       // Sleep so that heartbeat processing thread gets to run.
       Thread.sleep(500);
@@ -663,12 +700,12 @@ public class TestSCMNodeManager {
        * the 3 second windows.
        */
 
-      nodeManager.processHeartbeat(healthyNode);
-      nodeManager.processHeartbeat(staleNode);
-      nodeManager.processHeartbeat(deadNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
+      nodeManager.processHeartbeat(staleNode, layoutInfo);
+      nodeManager.processHeartbeat(deadNode, layoutInfo);
 
       Thread.sleep(1500);
-      nodeManager.processHeartbeat(healthyNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
       Thread.sleep(2 * 1000);
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
 
@@ -688,10 +725,10 @@ public class TestSCMNodeManager {
        * staleNode to move to stale state and deadNode to move to dead state.
        */
 
-      nodeManager.processHeartbeat(healthyNode);
-      nodeManager.processHeartbeat(staleNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
+      nodeManager.processHeartbeat(staleNode, layoutInfo);
       Thread.sleep(1500);
-      nodeManager.processHeartbeat(healthyNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
       Thread.sleep(2 * 1000);
 
       // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -724,9 +761,9 @@ public class TestSCMNodeManager {
        * Cluster State : let us heartbeat all the nodes and verify that we get
        * back all the nodes in healthy state.
        */
-      nodeManager.processHeartbeat(healthyNode);
-      nodeManager.processHeartbeat(staleNode);
-      nodeManager.processHeartbeat(deadNode);
+      nodeManager.processHeartbeat(healthyNode, layoutInfo);
+      nodeManager.processHeartbeat(staleNode, layoutInfo);
+      nodeManager.processHeartbeat(deadNode, layoutInfo);
       Thread.sleep(500);
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
@@ -745,9 +782,14 @@ public class TestSCMNodeManager {
   private void heartbeatNodeSet(SCMNodeManager manager,
                                 List<DatanodeDetails> list,
                                 int sleepDuration) throws InterruptedException 
{
+    LayoutVersionManager versionManager = manager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
     while (!Thread.currentThread().isInterrupted()) {
       for (DatanodeDetails dn : list) {
-        manager.processHeartbeat(dn);
+        manager.processHeartbeat(dn, layoutInfo);
       }
       Thread.sleep(sleepDuration);
     }
@@ -830,11 +872,17 @@ public class TestSCMNodeManager {
         }
       };
 
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
 
       // No Thread just one time HBs the node manager, so that these will be
       // marked as dead nodes eventually.
       for (DatanodeDetails dn : deadNodeList) {
-        nodeManager.processHeartbeat(dn);
+        nodeManager.processHeartbeat(dn, layoutInfo);
       }
 
 
@@ -958,6 +1006,12 @@ public class TestSCMNodeManager {
     final long remaining = capacity - used;
     List<DatanodeDetails> dnList = new ArrayList<>(nodeCount);
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
       EventQueue eventQueue = (EventQueue) scm.getEventQueue();
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
@@ -969,11 +1023,13 @@ public class TestSCMNodeManager {
             .createStorageReport(dnId, storagePath, capacity, used, free, 
null);
         nodeManager.register(dn, TestUtils.createNodeReport(report), null,
             null);
-        nodeManager.processHeartbeat(dn);
+        nodeManager.processHeartbeat(dn, layoutInfo);
       }
       //TODO: wait for EventQueue to be processed
       eventQueue.processAll(8000L);
 
+      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY_READONLY));
+      Thread.sleep(3 * 1000);
       assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
       assertEquals(capacity * nodeCount, (long) nodeManager.getStats()
           .getCapacity().get());
@@ -1020,10 +1076,18 @@ public class TestSCMNodeManager {
         failed = !failed;
       }
       nodeManager.register(dn, TestUtils.createNodeReport(reports), null, 
null);
-      nodeManager.processHeartbeat(dn);
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
+      nodeManager.processHeartbeat(dn, layoutInfo);
       //TODO: wait for EventQueue to be processed
       eventQueue.processAll(8000L);
 
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY_READONLY));
+      Thread.sleep(3 * 1000);
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
       assertEquals(volumeCount / 2,
               nodeManager.minHealthyVolumeNum(dnList));
@@ -1073,7 +1137,13 @@ public class TestSCMNodeManager {
         nodeReportHandler.onMessage(
                 new NodeReportFromDatanode(datanodeDetails, nodeReportProto),
                 publisher);
-        nodeManager.processHeartbeat(datanodeDetails);
+        LayoutVersionManager versionManager =
+            nodeManager.getLayoutVersionManager();
+        LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+            
.setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+            
.setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+            .build();
+        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
         Thread.sleep(100);
       }
 
@@ -1148,7 +1218,14 @@ public class TestSCMNodeManager {
       foundRemaining = nodeManager.getStats().getRemaining().get();
       assertEquals(0, foundRemaining);
 
-      nodeManager.processHeartbeat(datanodeDetails);
+      LayoutVersionManager versionManager =
+          nodeManager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
+
+      nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
 
       // Wait up to 5 seconds so that the dead node becomes healthy
       // Verify usage info should be updated.
@@ -1196,9 +1273,15 @@ public class TestSCMNodeManager {
               new CloseContainerCommand(1L,
                   PipelineID.randomId())));
 
+      LayoutVersionManager versionManager =
+          nodemanager.getLayoutVersionManager();
+      LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+          .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+          .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+          .build();
       eq.processAll(1000L);
       List<SCMCommand> command =
-          nodemanager.processHeartbeat(datanodeDetails);
+          nodemanager.processHeartbeat(datanodeDetails, layoutInfo);
       // With dh registered, SCM will send create pipeline command to dn
       Assert.assertTrue(command.size() >= 1);
       Assert.assertTrue(command.get(0).getClass().equals(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
index e07edc4..4c47a07 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -38,6 +40,7 @@ import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.security.authentication.client
     .AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -120,13 +123,19 @@ public class TestStatisticsUpdate {
 
     //TODO: Support logic to mark a node as dead in NodeManager.
 
-    nodeManager.processHeartbeat(datanode2);
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    StorageContainerDatanodeProtocolProtos.LayoutVersionProto layoutInfo =
+        StorageContainerDatanodeProtocolProtos.LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
+    nodeManager.processHeartbeat(datanode2, layoutInfo);
     Thread.sleep(1000);
-    nodeManager.processHeartbeat(datanode2);
+    nodeManager.processHeartbeat(datanode2, layoutInfo);
     Thread.sleep(1000);
-    nodeManager.processHeartbeat(datanode2);
+    nodeManager.processHeartbeat(datanode2, layoutInfo);
     Thread.sleep(1000);
-    nodeManager.processHeartbeat(datanode2);
+    nodeManager.processHeartbeat(datanode2, layoutInfo);
     //THEN statistics in SCM should changed.
     stat = nodeManager.getStats();
     Assert.assertEquals(200L, stat.getCapacity().get().longValue());
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a30de35..b917309 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -276,10 +276,12 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param dd - Datanode Details.
+   * @param layoutInfo - Layout Version Proto
    * @return SCMheartbeat response list
    */
   @Override
-  public List<SCMCommand> processHeartbeat(DatanodeDetails dd) {
+  public List<SCMCommand> processHeartbeat(DatanodeDetails dd,
+                                           LayoutVersionProto layoutInfo) {
     return null;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index d20c55b..83b4bb0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
@@ -38,9 +40,12 @@ import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 
+import static java.lang.Thread.sleep;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
@@ -102,7 +107,12 @@ public class TestSCMNodeMetrics {
 
     NodeReportProto nodeReport = createNodeReport();
 
-    nodeManager.processHeartbeat(registeredDatanode);
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
+    nodeManager.processHeartbeat(registeredDatanode, layoutInfo);
 
     assertEquals("NumHBProcessed", hbProcessed + 1,
         getCounter("NumHBProcessed"));
@@ -116,8 +126,13 @@ public class TestSCMNodeMetrics {
 
     long hbProcessedFailed = getCounter("NumHBProcessingFailed");
 
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
     nodeManager.processHeartbeat(MockDatanodeDetails
-        .randomDatanodeDetails());
+        .randomDatanodeDetails(), layoutInfo);
 
     assertEquals("NumHBProcessingFailed", hbProcessedFailed + 1,
         getCounter("NumHBProcessingFailed"));
@@ -182,7 +197,7 @@ public class TestSCMNodeMetrics {
 
     MetricsRecordBuilder metricsSource = 
getMetrics(SCMNodeMetrics.SOURCE_NAME);
 
-    assertGauge("HealthyNodes", 1, metricsSource);
+    assertGauge("HealthyReadOnlyNodes", 1, metricsSource);
     assertGauge("StaleNodes", 0, metricsSource);
     assertGauge("DeadNodes", 0, metricsSource);
     assertGauge("DecommissioningNodes", 0, metricsSource);
@@ -194,6 +209,17 @@ public class TestSCMNodeMetrics {
     assertGauge("SSDUsed", 0L, metricsSource);
     assertGauge("SSDRemaining", 0L, metricsSource);
 
+    LayoutVersionManager versionManager = 
nodeManager.getLayoutVersionManager();
+    LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
+        .setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+        .setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+        .build();
+    nodeManager.processHeartbeat(registeredDatanode, layoutInfo);
+    sleep(4000);
+    metricsSource = getMetrics(SCMNodeMetrics.SOURCE_NAME);
+    assertGauge("HealthyReadOnlyNodes", 0, metricsSource);
+    assertGauge("HealthyNodes", 1, metricsSource);
+
   }
 
   private long getCounter(String metricName) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 2b492a2..fc5b1f5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -85,6 +87,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
@@ -372,8 +375,16 @@ public class TestStorageContainerManager {
       GenericTestUtils.waitFor(() -> {
         NodeManager nodeManager = cluster.getStorageContainerManager()
             .getScmNodeManager();
+        LayoutVersionManager versionManager =
+            nodeManager.getLayoutVersionManager();
+        StorageContainerDatanodeProtocolProtos.LayoutVersionProto layoutInfo
+            = StorageContainerDatanodeProtocolProtos.LayoutVersionProto
+            .newBuilder()
+            
.setSoftwareLayoutVersion(versionManager.getSoftwareLayoutVersion())
+            
.setMetadataLayoutVersion(versionManager.getMetadataLayoutVersion())
+            .build();
         List<SCMCommand> commands = nodeManager.processHeartbeat(
-            nodeManager.getNodes(NodeState.HEALTHY).get(0));
+            nodeManager.getNodes(NodeState.HEALTHY).get(0), layoutInfo);
 
         if (commands != null) {
           for (SCMCommand cmd : commands) {
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index de0028c..ec931f4 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -80,7 +80,9 @@ public class ClusterStateEndpoint {
     List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
     int containers = this.containerManager.getContainerIDs().size();
     int pipelines = this.pipelineManager.getPipelines().size();
-    int healthyDatanodes = nodeManager.getNodeCount(NodeState.HEALTHY);
+    int healthyDatanodes =
+        nodeManager.getNodeCount(NodeState.HEALTHY) +
+            nodeManager.getNodeCount(NodeState.HEALTHY_READONLY);
     SCMNodeStat stats = nodeManager.getStats();
     DatanodeStorageReport storageReport =
         new DatanodeStorageReport(stats.getCapacity().get(),
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index 7283f5e..55b1d90 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -27,6 +27,8 @@ import java.util.UUID;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@@ -126,12 +128,14 @@ public class ReconNodeManager extends SCMNodeManager {
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeDetails - DatanodeDetailsProto.
+   * @param layoutInfo - Layout Version Proto
    * @return SCMheartbeat response.
    */
   @Override
-  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+                                           LayoutVersionProto layoutInfo) {
     // Update heartbeat map with current time
     datanodeHeartbeatMap.put(datanodeDetails.getUuid(), Time.now());
-    return super.processHeartbeat(datanodeDetails);
+    return super.processHeartbeat(datanodeDetails, layoutInfo);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to