http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
deleted file mode 100644
index 88f984b..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.*;
-import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.*;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DEADNODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_STALENODE_INTERVAL;
-
-/**
- * NodeStateManager maintains the state of all the datanodes in the cluster. 
All
- * the node state change should happen only via NodeStateManager. It also
- * runs a heartbeat thread which periodically updates the node state.
- * <p>
- * The getNode(byState) functions make copy of node maps and then creates a 
list
- * based on that. It should be assumed that these get functions always report
- * *stale* information. For example, getting the deadNodeCount followed by
- * getNodes(DEAD) could very well produce totally different count. Also
- * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
- * guaranteed to add up to the total nodes that we know off. Please treat all
- * get functions in this file as a snap-shot of information that is 
inconsistent
- * as soon as you read it.
- */
-public class NodeStateManager implements Runnable, Closeable {
-
-  /**
-   * Node's life cycle events.
-   */
-  private enum NodeLifeCycleEvent {
-    TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED
-  }
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(NodeStateManager.class);
-
-  /**
-   * StateMachine for node lifecycle.
-   */
-  private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine;
-  /**
-   * This is the map which maintains the current state of all datanodes.
-   */
-  private final NodeStateMap nodeStateMap;
-  /**
-   * Maintains the mapping from node to pipelines a node is part of.
-   */
-  private final Node2PipelineMap node2PipelineMap;
-  /**
-   * Maintains the map from node to ContainerIDs for the containers
-   * available on the node.
-   */
-  private final Node2ContainerMap node2ContainerMap;
-  /**
-   * Used for publishing node state change events.
-   */
-  private final EventPublisher eventPublisher;
-  /**
-   * Maps the event to be triggered when a node state us updated.
-   */
-  private final Map<NodeState, Event<DatanodeDetails>> state2EventMap;
-  /**
-   * ExecutorService used for scheduling heartbeat processing thread.
-   */
-  private final ScheduledExecutorService executorService;
-  /**
-   * The frequency in which we have run the heartbeat processing thread.
-   */
-  private final long heartbeatCheckerIntervalMs;
-  /**
-   * The timeout value which will be used for marking a datanode as stale.
-   */
-  private final long staleNodeIntervalMs;
-  /**
-   * The timeout value which will be used for marking a datanode as dead.
-   */
-  private final long deadNodeIntervalMs;
-
-  /**
-   * Constructs a NodeStateManager instance with the given configuration.
-   *
-   * @param conf Configuration
-   */
-  public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
-    this.nodeStateMap = new NodeStateMap();
-    this.node2PipelineMap = new Node2PipelineMap();
-    this.node2ContainerMap = new Node2ContainerMap();
-    this.eventPublisher = eventPublisher;
-    this.state2EventMap = new HashMap<>();
-    initialiseState2EventMap();
-    Set<NodeState> finalStates = new HashSet<>();
-    finalStates.add(NodeState.DECOMMISSIONED);
-    this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates);
-    initializeStateMachine();
-    heartbeatCheckerIntervalMs = HddsServerUtil
-        .getScmheartbeatCheckerInterval(conf);
-    staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
-    deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf);
-    Preconditions.checkState(heartbeatCheckerIntervalMs > 0,
-        OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0.");
-    Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs,
-        OZONE_SCM_STALENODE_INTERVAL + " should be less than" +
-            OZONE_SCM_DEADNODE_INTERVAL);
-    executorService = HadoopExecutors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
-    executorService.schedule(this, heartbeatCheckerIntervalMs,
-        TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Populates state2event map.
-   */
-  private void initialiseState2EventMap() {
-    state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
-    state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
-  }
-
-  /*
-   *
-   * Node and State Transition Mapping:
-   *
-   * State: HEALTHY         -------------------> STALE
-   * Event:                       TIMEOUT
-   *
-   * State: STALE           -------------------> DEAD
-   * Event:                       TIMEOUT
-   *
-   * State: STALE           -------------------> HEALTHY
-   * Event:                       RESTORE
-   *
-   * State: DEAD            -------------------> HEALTHY
-   * Event:                       RESURRECT
-   *
-   * State: HEALTHY         -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: STALE           -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: DEAD            -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: DECOMMISSIONING -------------------> DECOMMISSIONED
-   * Event:                     DECOMMISSIONED
-   *
-   *  Node State Flow
-   *
-   *  +--------------------------------------------------------+
-   *  |                                     (RESURRECT)        |
-   *  |   +--------------------------+                         |
-   *  |   |      (RESTORE)           |                         |
-   *  |   |                          |                         |
-   *  V   V                          |                         |
-   * [HEALTHY]------------------->[STALE]------------------->[DEAD]
-   *    |         (TIMEOUT)          |         (TIMEOUT)       |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    | (DECOMMISSION)             | (DECOMMISSION)          | (DECOMMISSION)
-   *    |                            V                         |
-   *    +------------------->[DECOMMISSIONING]<----------------+
-   *                                 |
-   *                                 | (DECOMMISSIONED)
-   *                                 |
-   *                                 V
-   *                          [DECOMMISSIONED]
-   *
-   */
-
-  /**
-   * Initializes the lifecycle of node state machine.
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
-    stateMachine.addTransition(
-        NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
-    stateMachine.addTransition(
-        NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
-    stateMachine.addTransition(
-        NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);
-    stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.STALE, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.DEAD, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
-        NodeLifeCycleEvent.DECOMMISSIONED);
-
-  }
-
-  /**
-   * Adds a new node to the state manager.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @throws NodeAlreadyExistsException if the node is already present
-   */
-  public void addNode(DatanodeDetails datanodeDetails)
-      throws NodeAlreadyExistsException {
-    nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState());
-    eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
-  }
-
-  /**
-   * Adds a pipeline in the node2PipelineMap.
-   * @param pipeline - Pipeline to be added
-   */
-  public void addPipeline(Pipeline pipeline) {
-    node2PipelineMap.addPipeline(pipeline);
-  }
-
-  /**
-   * Get information about the node.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @return DatanodeInfo
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public DatanodeInfo getNode(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    return nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
-  }
-
-  /**
-   * Updates the last heartbeat time of the node.
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    nodeStateMap.getNodeInfo(datanodeDetails.getUuid())
-        .updateLastHeartbeatTime();
-  }
-
-  /**
-   * Returns the current state of the node.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @return NodeState
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public NodeState getNodeState(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    return nodeStateMap.getNodeState(datanodeDetails.getUuid());
-  }
-
-  /**
-   * Returns all the node which are in healthy state.
-   *
-   * @return list of healthy nodes
-   */
-  public List<DatanodeDetails> getHealthyNodes() {
-    return getNodes(NodeState.HEALTHY);
-  }
-
-  /**
-   * Returns all the node which are in stale state.
-   *
-   * @return list of stale nodes
-   */
-  public List<DatanodeDetails> getStaleNodes() {
-    return getNodes(NodeState.STALE);
-  }
-
-  /**
-   * Returns all the node which are in dead state.
-   *
-   * @return list of dead nodes
-   */
-  public List<DatanodeDetails> getDeadNodes() {
-    return getNodes(NodeState.DEAD);
-  }
-
-  /**
-   * Returns all the node which are in the specified state.
-   *
-   * @param state NodeState
-   *
-   * @return list of nodes
-   */
-  public List<DatanodeDetails> getNodes(NodeState state) {
-    List<DatanodeDetails> nodes = new LinkedList<>();
-    nodeStateMap.getNodes(state).forEach(
-        uuid -> {
-          try {
-            nodes.add(nodeStateMap.getNodeDetails(uuid));
-          } catch (NodeNotFoundException e) {
-            // This should not happen unless someone else other than
-            // NodeStateManager is directly modifying NodeStateMap and removed
-            // the node entry after we got the list of UUIDs.
-            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
-          }
-        });
-    return nodes;
-  }
-
-  /**
-   * Returns all the nodes which have registered to NodeStateManager.
-   *
-   * @return all the managed nodes
-   */
-  public List<DatanodeDetails> getAllNodes() {
-    List<DatanodeDetails> nodes = new LinkedList<>();
-    nodeStateMap.getAllNodes().forEach(
-        uuid -> {
-          try {
-            nodes.add(nodeStateMap.getNodeDetails(uuid));
-          } catch (NodeNotFoundException e) {
-            // This should not happen unless someone else other than
-            // NodeStateManager is directly modifying NodeStateMap and removed
-            // the node entry after we got the list of UUIDs.
-            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
-          }
-        });
-    return nodes;
-  }
-
-  /**
-   * Gets set of pipelineID a datanode belongs to.
-   * @param dnId - Datanode ID
-   * @return Set of PipelineID
-   */
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return node2PipelineMap.getPipelines(dnId);
-  }
-
-  /**
-   * Returns the count of healthy nodes.
-   *
-   * @return healthy node count
-   */
-  public int getHealthyNodeCount() {
-    return getNodeCount(NodeState.HEALTHY);
-  }
-
-  /**
-   * Returns the count of stale nodes.
-   *
-   * @return stale node count
-   */
-  public int getStaleNodeCount() {
-    return getNodeCount(NodeState.STALE);
-  }
-
-  /**
-   * Returns the count of dead nodes.
-   *
-   * @return dead node count
-   */
-  public int getDeadNodeCount() {
-    return getNodeCount(NodeState.DEAD);
-  }
-
-  /**
-   * Returns the count of nodes in specified state.
-   *
-   * @param state NodeState
-   *
-   * @return node count
-   */
-  public int getNodeCount(NodeState state) {
-    return nodeStateMap.getNodeCount(state);
-  }
-
-  /**
-   * Returns the count of all nodes managed by NodeStateManager.
-   *
-   * @return node count
-   */
-  public int getTotalNodeCount() {
-    return nodeStateMap.getTotalNodeCount();
-  }
-
-  /**
-   * Removes a node from NodeStateManager.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public void removeNode(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    nodeStateMap.removeNode(datanodeDetails.getUuid());
-  }
-
-  /**
-   * Returns the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
-    return nodeStateMap.getNodeStat(uuid);
-  }
-
-  /**
-   * Returns a unmodifiable copy of nodeStats.
-   * @return map with node stats.
-   */
-  public Map<UUID, SCMNodeStat> getNodeStatsMap() {
-    return nodeStateMap.getNodeStats();
-  }
-
-  /**
-   * Set the stat for the node.
-   *
-   * @param uuid node id.
-   *
-   * @param newstat new stat that will set to the specify node.
-   */
-  public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
-    nodeStateMap.setNodeStat(uuid, newstat);
-  }
-
-  /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not present.
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    return nodeStateMap.removeNodeStat(uuid);
-  }
-
-  /**
-   * Removes a pipeline from the node2PipelineMap.
-   * @param pipeline - Pipeline to be removed
-   */
-  public void removePipeline(Pipeline pipeline) {
-    node2PipelineMap.removePipeline(pipeline);
-  }
-  /**
-   * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
-   * @param containerIds - Set of containerIDs
-   * @throws SCMException - if datanode is not known. For new datanode use
-   *                        addDatanodeInContainerMap call.
-   */
-  public void setContainersForDatanode(UUID uuid, Set<ContainerID> 
containerIds)
-      throws SCMException {
-    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return node2ContainerMap.processReport(uuid, containerIds);
-  }
-
-  /**
-   * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
-   * @return - set of containerIDs
-   */
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return node2ContainerMap.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
-  }
-
-  /**
-   * Move Stale or Dead node to healthy if we got a heartbeat from them.
-   * Move healthy nodes to stale nodes if it is needed.
-   * Move Stales node to dead if needed.
-   *
-   * @see Thread#run()
-   */
-  @Override
-  public void run() {
-
-    /*
-     *
-     *          staleNodeDeadline                healthyNodeDeadline
-     *                 |                                  |
-     *      Dead       |             Stale                |     Healthy
-     *      Node       |             Node                 |     Node
-     *      Window     |             Window               |     Window
-     * ----------------+----------------------------------+------------------->
-     *                      >>-->> time-line >>-->>
-     *
-     * Here is the logic of computing the health of a node.
-     *
-     * 1. We get the current time and look back that the time
-     *    when we got a heartbeat from a node.
-     * 
-     * 2. If the last heartbeat was within the window of healthy 
node we mark
-     *    it as healthy.
-     * 
-     * 3. If the last HB Time stamp is longer and falls within the 
window of
-     *    Stale Node time, we will mark it as Stale.
-     * 
-     * 4. If the last HB time is older than the Stale Window, then 
the node is
-     *    marked as dead.
-     *
-     * The Processing starts from current time and looks backwards in time.
-     */
-    long processingStartTime = Time.monotonicNow();
-    // After this time node is considered to be stale.
-    long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs;
-    // After this time node is considered to be dead.
-    long staleNodeDeadline = processingStartTime - deadNodeIntervalMs;
-
-    Predicate<Long> healthyNodeCondition =
-        (lastHbTime) -> lastHbTime >= healthyNodeDeadline;
-    // staleNodeCondition is superset of stale and dead node
-    Predicate<Long> staleNodeCondition =
-        (lastHbTime) -> lastHbTime < healthyNodeDeadline;
-    Predicate<Long> deadNodeCondition =
-        (lastHbTime) -> lastHbTime < staleNodeDeadline;
-    try {
-      for (NodeState state : NodeState.values()) {
-        List<UUID> nodes = nodeStateMap.getNodes(state);
-        for (UUID id : nodes) {
-          DatanodeInfo node = nodeStateMap.getNodeInfo(id);
-          switch (state) {
-          case HEALTHY:
-            // Move the node to STALE if the last heartbeat time is less than
-            // configured stale-node interval.
-            updateNodeState(node, staleNodeCondition, state,
-                  NodeLifeCycleEvent.TIMEOUT);
-            break;
-          case STALE:
-            // Move the node to DEAD if the last heartbeat time is less than
-            // configured dead-node interval.
-            updateNodeState(node, deadNodeCondition, state,
-                NodeLifeCycleEvent.TIMEOUT);
-            // Restore the node if we have received heartbeat before configured
-            // stale-node interval.
-            updateNodeState(node, healthyNodeCondition, state,
-                NodeLifeCycleEvent.RESTORE);
-            break;
-          case DEAD:
-            // Resurrect the node if we have received heartbeat before
-            // configured stale-node interval.
-            updateNodeState(node, healthyNodeCondition, state,
-                NodeLifeCycleEvent.RESURRECT);
-            break;
-            // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in
-            // heartbeat processing.
-          case DECOMMISSIONING:
-          case DECOMMISSIONED:
-          default:
-          }
-        }
-      }
-    } catch (NodeNotFoundException e) {
-      // This should not happen unless someone else other than
-      // NodeStateManager is directly modifying NodeStateMap and removed
-      // the node entry after we got the list of UUIDs.
-      LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
-    }
-    long processingEndTime = Time.monotonicNow();
-    //If we have taken too much time for HB processing, log that information.
-    if ((processingEndTime - processingStartTime) >
-        heartbeatCheckerIntervalMs) {
-      LOG.error("Total time spend processing datanode HB's is greater than " +
-              "configured values for datanode heartbeats. Please adjust the" +
-              " heartbeat configs. Time Spend on HB processing: {} seconds " +
-              "Datanode heartbeat Interval: {} seconds.",
-          TimeUnit.MILLISECONDS
-              .toSeconds(processingEndTime - processingStartTime),
-          heartbeatCheckerIntervalMs);
-    }
-
-    // we purposefully make this non-deterministic. Instead of using a
-    // scheduleAtFixedFrequency  we will just go to sleep
-    // and wake up at the next rendezvous point, which is currentTime +
-    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
-    // heart beating not at a fixed cadence, but clock tick + time taken to
-    // work.
-    //
-    // This time taken to work can skew the heartbeat processor thread.
-    // The reason why we don't care is because of the following reasons.
-    //
-    // 1. checkerInterval is general many magnitudes faster than datanode HB
-    // frequency.
-    //
-    // 2. if we have too much nodes, the SCM would be doing only HB
-    // processing, this could lead to SCM's CPU starvation. With this
-    // approach we always guarantee that  HB thread sleeps for a little while.
-    //
-    // 3. It is possible that we will never finish processing the HB's in the
-    // thread. But that means we have a mis-configured system. We will warn
-    // the users by logging that information.
-    //
-    // 4. And the most important reason, heartbeats are not blocked even if
-    // this thread does not run, they will go into the processing queue.
-
-    if (!Thread.currentThread().isInterrupted() &&
-        !executorService.isShutdown()) {
-      executorService.schedule(this, heartbeatCheckerIntervalMs,
-          TimeUnit.MILLISECONDS);
-    } else {
-      LOG.info("Current Thread is interrupted, shutting down HB processing " +
-          "thread for Node Manager.");
-    }
-
-  }
-
-  /**
-   * Updates the node state if the condition satisfies.
-   *
-   * @param node DatanodeInfo
-   * @param condition condition to check
-   * @param state current state of node
-   * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition
-   *                       matches
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
-      NodeState state, NodeLifeCycleEvent lifeCycleEvent)
-      throws NodeNotFoundException {
-    try {
-      if (condition.test(node.getLastHeartbeatTime())) {
-        NodeState newState = stateMachine.getNextState(state, lifeCycleEvent);
-        nodeStateMap.updateNodeState(node.getUuid(), state, newState);
-        if (state2EventMap.containsKey(newState)) {
-          eventPublisher.fireEvent(state2EventMap.get(newState), node);
-        }
-      }
-    } catch (InvalidStateTransitionException e) {
-      LOG.warn("Invalid state transition of node {}." +
-              " Current state: {}, life cycle event: {}",
-          node, state, lifeCycleEvent);
-    }
-  }
-
-  @Override
-  public void close() {
-    executorService.shutdown();
-    try {
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        executorService.shutdownNow();
-      }
-
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOG.error("Unable to shutdown NodeStateManager properly.");
-      }
-    } catch (InterruptedException e) {
-      executorService.shutdownNow();
-      Thread.currentThread().interrupt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
deleted file mode 100644
index 36a6f15..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.node;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.VersionInfo;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
-    .ErrorCode;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
-import org.apache.hadoop.ozone.protocol.VersionResponse;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Maintains information about the Datanodes on SCM side.
- * <p>
- * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
- * <p>
- * The getNode(byState) functions make copy of node maps and then creates a 
list
- * based on that. It should be assumed that these get functions always report
- * *stale* information. For example, getting the deadNodeCount followed by
- * getNodes(DEAD) could very well produce totally different count. Also
- * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
- * guaranteed to add up to the total nodes that we know off. Please treat all
- * get functions in this file as a snap-shot of information that is 
inconsistent
- * as soon as you read it.
- */
-public class SCMNodeManager
-    implements NodeManager, StorageContainerNodeProtocol {
-
-  @VisibleForTesting
-  static final Logger LOG =
-      LoggerFactory.getLogger(SCMNodeManager.class);
-
-  private final NodeStateManager nodeStateManager;
-  // Should we maintain aggregated stats? If this is not frequently used, we
-  // can always calculate it from nodeStats whenever required.
-  // Aggregated node stats
-  private SCMNodeStat scmStat;
-  // Should we create ChillModeManager and extract all the chill mode logic
-  // to a new class?
-  private int chillModeNodeCount;
-  private final String clusterID;
-  private final VersionInfo version;
-  /**
-   * During start up of SCM, it will enter into chill mode and will be there
-   * until number of Datanodes registered reaches {@code chillModeNodeCount}.
-   * This flag is for tracking startup chill mode.
-   */
-  private AtomicBoolean inStartupChillMode;
-  /**
-   * Administrator can put SCM into chill mode manually.
-   * This flag is for tracking manual chill mode.
-   */
-  private AtomicBoolean inManualChillMode;
-  private final CommandQueue commandQueue;
-  // Node manager MXBean
-  private ObjectName nmInfoBean;
-
-  // Node pool manager.
-  private final StorageContainerManager scmManager;
-
-  /**
-   * Constructs SCM machine Manager.
-   */
-  public SCMNodeManager(OzoneConfiguration conf, String clusterID,
-      StorageContainerManager scmManager, EventPublisher eventPublisher)
-      throws IOException {
-    this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
-    this.scmStat = new SCMNodeStat();
-    this.clusterID = clusterID;
-    this.version = VersionInfo.getLatestVersion();
-    this.commandQueue = new CommandQueue();
-    // TODO: Support this value as a Percentage of known machines.
-    this.chillModeNodeCount = 1;
-    this.inStartupChillMode = new AtomicBoolean(true);
-    this.inManualChillMode = new AtomicBoolean(false);
-    this.scmManager = scmManager;
-    LOG.info("Entering startup chill mode.");
-    registerMXBean();
-  }
-
-  private void registerMXBean() {
-    this.nmInfoBean = MBeans.register("SCMNodeManager",
-        "SCMNodeManagerInfo", this);
-  }
-
-  private void unregisterMXBean() {
-    if(this.nmInfoBean != null) {
-      MBeans.unregister(this.nmInfoBean);
-      this.nmInfoBean = null;
-    }
-  }
-
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  @Override
-  public void removeNode(DatanodeDetails node) throws NodeNotFoundException {
-    nodeStateManager.removeNode(node);
-  }
-
-  /**
-   * Gets all datanodes that are in a certain state. This function works by
-   * taking a snapshot of the current collection and then returning the list
-   * from that collection. This means that real map might have changed by the
-   * time we return this list.
-   *
-   * @return List of Datanodes that are known to SCM in the requested state.
-   */
-  @Override
-  public List<DatanodeDetails> getNodes(NodeState nodestate) {
-    return nodeStateManager.getNodes(nodestate);
-  }
-
-  /**
-   * Returns all datanodes that are known to SCM.
-   *
-   * @return List of DatanodeDetails
-   */
-  @Override
-  public List<DatanodeDetails> getAllNodes() {
-    return nodeStateManager.getAllNodes();
-  }
-
-  /**
-   * Get the minimum number of nodes to get out of Chill mode.
-   *
-   * @return int
-   */
-  @Override
-  public int getMinimumChillModeNodes() {
-    return chillModeNodeCount;
-  }
-
-  /**
-   * Sets the Minimum chill mode nodes count, used only in testing.
-   *
-   * @param count - Number of nodes.
-   */
-  @VisibleForTesting
-  public void setMinimumChillModeNodes(int count) {
-    chillModeNodeCount = count;
-  }
-
-  /**
-   * Returns chill mode Status string.
-   * @return String
-   */
-  @Override
-  public String getChillModeStatus() {
-    if (inStartupChillMode.get()) {
-      return "Still in chill mode, waiting on nodes to report in." +
-          String.format(" %d nodes reported, minimal %d nodes required.",
-              nodeStateManager.getTotalNodeCount(), 
getMinimumChillModeNodes());
-    }
-    if (inManualChillMode.get()) {
-      return "Out of startup chill mode, but in manual chill mode." +
-          String.format(" %d nodes have reported in.",
-              nodeStateManager.getTotalNodeCount());
-    }
-    return "Out of chill mode." +
-        String.format(" %d nodes have reported in.",
-            nodeStateManager.getTotalNodeCount());
-  }
-
-  /**
-   * Forcefully exits the chill mode even if we have not met the minimum
-   * criteria of exiting the chill mode. This will exit from both startup
-   * and manual chill mode.
-   */
-  @Override
-  public void forceExitChillMode() {
-    if(inStartupChillMode.get()) {
-      LOG.info("Leaving startup chill mode.");
-      inStartupChillMode.set(false);
-    }
-    if(inManualChillMode.get()) {
-      LOG.info("Leaving manual chill mode.");
-      inManualChillMode.set(false);
-    }
-  }
-
-  /**
-   * Puts the node manager into manual chill mode.
-   */
-  @Override
-  public void enterChillMode() {
-    LOG.info("Entering manual chill mode.");
-    inManualChillMode.set(true);
-  }
-
-  /**
-   * Brings node manager out of manual chill mode.
-   */
-  @Override
-  public void exitChillMode() {
-    LOG.info("Leaving manual chill mode.");
-    inManualChillMode.set(false);
-  }
-
-  /**
-   * Returns true if node manager is out of chill mode, else false.
-   * @return true if out of chill mode, else false
-   */
-  @Override
-  public boolean isOutOfChillMode() {
-    return !(inStartupChillMode.get() || inManualChillMode.get());
-  }
-
-  /**
-   * Returns the Number of Datanodes by State they are in.
-   *
-   * @return int -- count
-   */
-  @Override
-  public int getNodeCount(NodeState nodestate) {
-    return nodeStateManager.getNodeCount(nodestate);
-  }
-
-  /**
-   * Returns the node state of a specific node.
-   *
-   * @param datanodeDetails - Datanode Details
-   * @return Healthy/Stale/Dead/Unknown.
-   */
-  @Override
-  public NodeState getNodeState(DatanodeDetails datanodeDetails) {
-    try {
-      return nodeStateManager.getNodeState(datanodeDetails);
-    } catch (NodeNotFoundException e) {
-      // TODO: should we throw NodeNotFoundException?
-      return null;
-    }
-  }
-
-
-  private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
-    SCMNodeStat stat;
-    try {
-      stat = nodeStateManager.getNodeStat(dnId);
-    } catch (NodeNotFoundException e) {
-      LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
-          "dead datanode {}", dnId);
-      stat = new SCMNodeStat();
-    }
-
-    if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
-      long totalCapacity = 0;
-      long totalRemaining = 0;
-      long totalScmUsed = 0;
-      List<StorageReportProto> storageReports = nodeReport
-          .getStorageReportList();
-      for (StorageReportProto report : storageReports) {
-        totalCapacity += report.getCapacity();
-        totalRemaining +=  report.getRemaining();
-        totalScmUsed+= report.getScmUsed();
-      }
-      scmStat.subtract(stat);
-      stat.set(totalCapacity, totalScmUsed, totalRemaining);
-      scmStat.add(stat);
-    }
-    nodeStateManager.setNodeStat(dnId, stat);
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated with it. 
If
-   * the stream is already closed then invoking this method has no effect.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    unregisterMXBean();
-  }
-
-  /**
-   * Gets the version info from SCM.
-   *
-   * @param versionRequest - version Request.
-   * @return - returns SCM version info and other required information needed 
by
-   * datanode.
-   */
-  @Override
-  public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
-    return VersionResponse.newBuilder()
-        .setVersion(this.version.getVersion())
-        .addValue(OzoneConsts.SCM_ID,
-            this.scmManager.getScmStorage().getScmId())
-        .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
-            .getClusterID())
-        .build();
-  }
-
-  /**
-   * Register the node if the node finds that it is not registered with any
-   * SCM.
-   *
-   * @param datanodeDetails - Send datanodeDetails with Node info.
-   *                   This function generates and assigns new datanode ID
-   *                   for the datanode. This allows SCM to be run independent
-   *                   of Namenode if required.
-   * @param nodeReport NodeReport.
-   *
-   * @return SCMHeartbeatResponseProto
-   */
-  @Override
-  public RegisteredCommand register(
-      DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
-      PipelineReportsProto pipelineReportsProto) {
-
-    InetAddress dnAddress = Server.getRemoteIp();
-    if (dnAddress != null) {
-      // Mostly called inside an RPC, update ip and peer hostname
-      datanodeDetails.setHostName(dnAddress.getHostName());
-      datanodeDetails.setIpAddress(dnAddress.getHostAddress());
-    }
-    UUID dnId = datanodeDetails.getUuid();
-    try {
-      nodeStateManager.addNode(datanodeDetails);
-      nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
-      if(inStartupChillMode.get() &&
-          nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) {
-        inStartupChillMode.getAndSet(false);
-        LOG.info("Leaving startup chill mode.");
-      }
-      // Updating Node Report, as registration is successful
-      updateNodeStat(datanodeDetails.getUuid(), nodeReport);
-      LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid());
-    } catch (NodeAlreadyExistsException e) {
-      LOG.trace("Datanode is already registered. Datanode: {}",
-          datanodeDetails.toString());
-    }
-    return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
-        .setDatanodeUUID(datanodeDetails.getUuidString())
-        .setClusterID(this.clusterID)
-        .setHostname(datanodeDetails.getHostName())
-        .setIpAddress(datanodeDetails.getIpAddress())
-        .build();
-  }
-
-  /**
-   * Send heartbeat to indicate the datanode is alive and doing well.
-   *
-   * @param datanodeDetails - DatanodeDetailsProto.
-   * @return SCMheartbeat response.
-   * @throws IOException
-   */
-  @Override
-  public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
-    Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
-        "DatanodeDetails.");
-    try {
-      nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
-    } catch (NodeNotFoundException e) {
-      LOG.warn("SCM receive heartbeat from unregistered datanode {}",
-          datanodeDetails);
-      commandQueue.addCommand(datanodeDetails.getUuid(),
-          new ReregisterCommand());
-    }
-    return commandQueue.getCommand(datanodeDetails.getUuid());
-  }
-
-  /**
-   * Process node report.
-   *
-   * @param dnUuid
-   * @param nodeReport
-   */
-  @Override
-  public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
-    this.updateNodeStat(dnUuid, nodeReport);
-  }
-
-  /**
-   * Returns the aggregated node stats.
-   * @return the aggregated node stats.
-   */
-  @Override
-  public SCMNodeStat getStats() {
-    return new SCMNodeStat(this.scmStat);
-  }
-
-  /**
-   * Return a map of node stats.
-   * @return a map of individual node stats (live/stale but not dead).
-   */
-  @Override
-  public Map<UUID, SCMNodeStat> getNodeStats() {
-    return nodeStateManager.getNodeStatsMap();
-  }
-
-  /**
-   * Return the node stat of the specified datanode.
-   * @param datanodeDetails - datanode ID.
-   * @return node stat if it is live/stale, null if it is decommissioned or
-   * doesn't exist.
-   */
-  @Override
-  public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
-    try {
-      return new SCMNodeMetric(
-          nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
-    } catch (NodeNotFoundException e) {
-      LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
-          datanodeDetails.getUuid());
-      return null;
-    }
-  }
-
-  @Override
-  public Map<String, Integer> getNodeCount() {
-    Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
-    for(NodeState state : NodeState.values()) {
-      nodeCountMap.put(state.toString(), getNodeCount(state));
-    }
-    return nodeCountMap;
-  }
-
-  /**
-   * Get set of pipelines a datanode is part of.
-   * @param dnId - datanodeID
-   * @return Set of PipelineID
-   */
-  @Override
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return nodeStateManager.getPipelineByDnID(dnId);
-  }
-
-
-  /**
-   * Add pipeline information in the NodeManager.
-   * @param pipeline - Pipeline to be added
-   */
-  @Override
-  public void addPipeline(Pipeline pipeline) {
-    nodeStateManager.addPipeline(pipeline);
-  }
-
-  /**
-   * Remove a pipeline information from the NodeManager.
-   * @param pipeline - Pipeline to be removed
-   */
-  @Override
-  public void removePipeline(Pipeline pipeline) {
-    nodeStateManager.removePipeline(pipeline);
-  }
-
-  /**
-   * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
-   * @param containerIds - Set of containerIDs
-   * @throws SCMException - if datanode is not known. For new datanode use
-   *                        addDatanodeInContainerMap call.
-   */
-  @Override
-  public void setContainersForDatanode(UUID uuid,
-      Set<ContainerID> containerIds) throws SCMException {
-    nodeStateManager.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  @Override
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return nodeStateManager.processContainerReport(uuid, containerIds);
-  }
-
-  /**
-   * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
-   * @return - set of containerIDs
-   */
-  @Override
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return nodeStateManager.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  @Override
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
-  }
-
-  // TODO:
-  // Since datanode commands are added through event queue, onMessage method
-  // should take care of adding commands to command queue.
-  // Refactor and remove all the usage of this method and delete this method.
-  @Override
-  public void addDatanodeCommand(UUID dnId, SCMCommand command) {
-    this.commandQueue.addCommand(dnId, command);
-  }
-
-  /**
-   * This method is called by EventQueue whenever someone adds a new
-   * DATANODE_COMMAND to the Queue.
-   *
-   * @param commandForDatanode DatanodeCommand
-   * @param ignored publisher
-   */
-  @Override
-  public void onMessage(CommandForDatanode commandForDatanode,
-      EventPublisher ignored) {
-    addDatanodeCommand(commandForDatanode.getDatanodeId(),
-        commandForDatanode.getCommand());
-  }
-
-  /**
-   * Update the node stats and cluster storage stats in this SCM Node Manager.
-   *
-   * @param dnUuid datanode uuid.
-   */
-  @Override
-  public void processDeadNode(UUID dnUuid) {
-    try {
-      SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
-      if (stat != null) {
-        LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
-        scmStat.subtract(stat);
-        stat.set(0, 0, 0);
-      }
-    } catch (NodeNotFoundException e) {
-      LOG.warn("Can't update stats based on message of dead Datanode {}, it"
-          + " doesn't exist or decommissioned already.", dnUuid);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
deleted file mode 100644
index 32ecbad..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
-
-import java.util.Set;
-import java.util.UUID;
-
-/**
- *
- * This is the JMX management interface for node manager information.
- */
[email protected]
-public interface SCMNodeStorageStatMXBean {
-  /**
-   * Get the capacity of the dataNode.
-   * @param datanodeID Datanode Id
-   * @return long
-   */
-  long getCapacity(UUID datanodeID);
-
-  /**
-   * Returns the remaining space of a Datanode.
-   * @param datanodeId Datanode Id
-   * @return long
-   */
-  long getRemainingSpace(UUID datanodeId);
-
-
-  /**
-   * Returns used space in bytes of a Datanode.
-   * @return long
-   */
-  long getUsedSpace(UUID datanodeId);
-
-  /**
-   * Returns the total capacity of all dataNodes.
-   * @return long
-   */
-  long getTotalCapacity();
-
-  /**
-   * Returns the total Used Space in all Datanodes.
-   * @return long
-   */
-  long getTotalSpaceUsed();
-
-  /**
-   * Returns the total Remaining Space in all Datanodes.
-   * @return long
-   */
-  long getTotalFreeSpace();
-
-  /**
-   * Returns the set of disks for a given Datanode.
-   * @return set of storage volumes
-   */
-  Set<StorageLocationReport> getStorageVolumes(UUID datanodeId);
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
deleted file mode 100644
index 1b0e5b5..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.StorageReportProto;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_SUCH_DATANODE;
-
-/**
- * This data structure maintains the disk space capacity, disk usage and free
- * space availability per Datanode.
- * This information is built from the DN node reports.
- */
-public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
-  static final Logger LOG =
-      LoggerFactory.getLogger(SCMNodeStorageStatMap.class);
-
-  private final double warningUtilizationThreshold;
-  private final double criticalUtilizationThreshold;
-
-  private final Map<UUID, Set<StorageLocationReport>> scmNodeStorageReportMap;
-  // NodeStorageInfo MXBean
-  private ObjectName scmNodeStorageInfoBean;
-  /**
-   * constructs the scmNodeStorageReportMap object.
-   */
-  public SCMNodeStorageStatMap(OzoneConfiguration conf) {
-    // scmNodeStorageReportMap = new ConcurrentHashMap<>();
-    scmNodeStorageReportMap = new ConcurrentHashMap<>();
-    warningUtilizationThreshold = conf.getDouble(
-        OzoneConfigKeys.
-            HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD,
-        OzoneConfigKeys.
-            HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD_DEFAULT);
-    criticalUtilizationThreshold = conf.getDouble(
-        OzoneConfigKeys.
-            HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD,
-        OzoneConfigKeys.
-            HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT);
-  }
-
-  /**
-   * Enum that Describes what we should do at various thresholds.
-   */
-  public enum UtilizationThreshold {
-    NORMAL, WARN, CRITICAL;
-  }
-
-  /**
-   * Returns true if this a datanode that is already tracked by
-   * scmNodeStorageReportMap.
-   *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
-   */
-  public boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return scmNodeStorageReportMap.containsKey(datanodeID);
-  }
-
-  public List<UUID> getDatanodeList(
-      UtilizationThreshold threshold) {
-    return scmNodeStorageReportMap.entrySet().stream().filter(
-        entry -> (isThresholdReached(threshold,
-            getScmUsedratio(getUsedSpace(entry.getKey()),
-                getCapacity(entry.getKey())))))
-        .map(Map.Entry::getKey)
-        .collect(Collectors.toList());
-  }
-
-
-
-  /**
-   * Insert a new datanode into Node2Container Map.
-   *
-   * @param datanodeID -- Datanode UUID
-   * @param report - set if StorageReports.
-   */
-  public void insertNewDatanode(UUID datanodeID,
-      Set<StorageLocationReport> report) throws SCMException {
-    Preconditions.checkNotNull(report);
-    Preconditions.checkState(report.size() != 0);
-    Preconditions.checkNotNull(datanodeID);
-    synchronized (scmNodeStorageReportMap) {
-      if (isKnownDatanode(datanodeID)) {
-        throw new SCMException("Node already exists in the map",
-            DUPLICATE_DATANODE);
-      }
-      scmNodeStorageReportMap.putIfAbsent(datanodeID, report);
-    }
-  }
-
-  //TODO: This should be called once SCMNodeManager gets Started.
-  private void registerMXBean() {
-    this.scmNodeStorageInfoBean = MBeans.register("StorageContainerManager",
-        "scmNodeStorageInfo", this);
-  }
-
-  //TODO: Unregister call should happen as a part of SCMNodeManager shutdown.
-  private void unregisterMXBean() {
-    if(this.scmNodeStorageInfoBean != null) {
-      MBeans.unregister(this.scmNodeStorageInfoBean);
-      this.scmNodeStorageInfoBean = null;
-    }
-  }
-  /**
-   * Updates the Container list of an existing DN.
-   *
-   * @param datanodeID - UUID of DN.
-   * @param report - set of Storage Reports for the Datanode.
-   * @throws SCMException - if we don't know about this datanode, for new DN
-   *                        use addDatanodeInContainerMap.
-   */
-  public void updateDatanodeMap(UUID datanodeID,
-      Set<StorageLocationReport> report) throws SCMException {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(report);
-    Preconditions.checkState(report.size() != 0);
-    synchronized (scmNodeStorageReportMap) {
-      if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
-        throw new SCMException("No such datanode", NO_SUCH_DATANODE);
-      }
-      scmNodeStorageReportMap.put(datanodeID, report);
-    }
-  }
-
-  public StorageReportResult processNodeReport(UUID datanodeID,
-      StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport)
-      throws IOException {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(nodeReport);
-
-    long totalCapacity = 0;
-    long totalRemaining = 0;
-    long totalScmUsed = 0;
-    Set<StorageLocationReport> storagReportSet = new HashSet<>();
-    Set<StorageLocationReport> fullVolumeSet = new HashSet<>();
-    Set<StorageLocationReport> failedVolumeSet = new HashSet<>();
-    List<StorageReportProto>
-        storageReports = nodeReport.getStorageReportList();
-    for (StorageReportProto report : storageReports) {
-      StorageLocationReport storageReport =
-          StorageLocationReport.getFromProtobuf(report);
-      storagReportSet.add(storageReport);
-      if (report.hasFailed() && report.getFailed()) {
-        failedVolumeSet.add(storageReport);
-      } else if (isThresholdReached(UtilizationThreshold.CRITICAL,
-          getScmUsedratio(report.getScmUsed(), report.getCapacity()))) {
-        fullVolumeSet.add(storageReport);
-      }
-      totalCapacity += report.getCapacity();
-      totalRemaining += report.getRemaining();
-      totalScmUsed += report.getScmUsed();
-    }
-
-    if (!isKnownDatanode(datanodeID)) {
-      insertNewDatanode(datanodeID, storagReportSet);
-    } else {
-      updateDatanodeMap(datanodeID, storagReportSet);
-    }
-    if (isThresholdReached(UtilizationThreshold.CRITICAL,
-        getScmUsedratio(totalScmUsed, totalCapacity))) {
-      LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}",
-          datanodeID, totalCapacity, totalScmUsed);
-      return StorageReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.DATANODE_OUT_OF_SPACE)
-          .setFullVolumeSet(fullVolumeSet).setFailedVolumeSet(failedVolumeSet)
-          .build();
-    }
-    if (isThresholdReached(UtilizationThreshold.WARN,
-        getScmUsedratio(totalScmUsed, totalCapacity))) {
-      LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}",
-          datanodeID, totalCapacity, totalScmUsed);
-    }
-
-    if (failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
-      return StorageReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.STORAGE_OUT_OF_SPACE)
-          .setFullVolumeSet(fullVolumeSet).build();
-    }
-
-    if (!failedVolumeSet.isEmpty() && fullVolumeSet.isEmpty()) {
-      return StorageReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.FAILED_STORAGE)
-          .setFailedVolumeSet(failedVolumeSet).build();
-    }
-    if (!failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
-      return StorageReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE)
-          .setFailedVolumeSet(failedVolumeSet).setFullVolumeSet(fullVolumeSet)
-          .build();
-    }
-    return StorageReportResult.ReportResultBuilder.newBuilder()
-        .setStatus(ReportStatus.ALL_IS_WELL).build();
-  }
-
-  private boolean isThresholdReached(UtilizationThreshold threshold,
-      double scmUsedratio) {
-    switch (threshold) {
-    case NORMAL:
-      return scmUsedratio < warningUtilizationThreshold;
-    case WARN:
-      return scmUsedratio >= warningUtilizationThreshold
-          && scmUsedratio < criticalUtilizationThreshold;
-    case CRITICAL:
-      return scmUsedratio >= criticalUtilizationThreshold;
-    default:
-      throw new RuntimeException("Unknown UtilizationThreshold value");
-    }
-  }
-
-  @Override
-  public long getCapacity(UUID dnId) {
-    long capacity = 0;
-    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
-    for (StorageLocationReport report : reportSet) {
-      capacity += report.getCapacity();
-    }
-    return capacity;
-  }
-
-  @Override
-  public long getRemainingSpace(UUID dnId) {
-    long remaining = 0;
-    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
-    for (StorageLocationReport report : reportSet) {
-      remaining += report.getRemaining();
-    }
-    return remaining;
-  }
-
-  @Override
-  public long getUsedSpace(UUID dnId) {
-    long scmUsed = 0;
-    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
-    for (StorageLocationReport report : reportSet) {
-      scmUsed += report.getScmUsed();
-    }
-    return scmUsed;
-  }
-
-  @Override
-  public long getTotalCapacity() {
-    long capacity = 0;
-    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
-    for (UUID id : dnIdSet) {
-      capacity += getCapacity(id);
-    }
-    return capacity;
-  }
-
-  @Override
-  public long getTotalSpaceUsed() {
-    long scmUsed = 0;
-    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
-    for (UUID id : dnIdSet) {
-      scmUsed += getUsedSpace(id);
-    }
-    return scmUsed;
-  }
-
-  @Override
-  public long getTotalFreeSpace() {
-    long remaining = 0;
-    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
-    for (UUID id : dnIdSet) {
-      remaining += getRemainingSpace(id);
-    }
-    return remaining;
-  }
-
-  /**
-   * removes the dataNode from scmNodeStorageReportMap.
-   * @param datanodeID
-   * @throws SCMException in case the dataNode is not found in the map.
-   */
-  public void removeDatanode(UUID datanodeID) throws SCMException {
-    Preconditions.checkNotNull(datanodeID);
-    synchronized (scmNodeStorageReportMap) {
-      if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
-        throw new SCMException("No such datanode", NO_SUCH_DATANODE);
-      }
-      scmNodeStorageReportMap.remove(datanodeID);
-    }
-  }
-
-  /**
-   * Returns the set of storage volumes for a Datanode.
-   * @param  datanodeID
-   * @return set of storage volumes.
-   */
-
-  @Override
-  public Set<StorageLocationReport> getStorageVolumes(UUID datanodeID) {
-    return scmNodeStorageReportMap.get(datanodeID);
-  }
-
-
-  /**
-   * Truncate to 4 digits since uncontrolled precision is some times
-   * counter intuitive to what users expect.
-   * @param value - double.
-   * @return double.
-   */
-  private double truncateDecimals(double value) {
-    final int multiplier = 10000;
-    return (double) ((long) (value * multiplier)) / multiplier;
-  }
-
-  /**
-   * get the scmUsed ratio.
-   */
-  public  double getScmUsedratio(long scmUsed, long capacity) {
-    double scmUsedRatio =
-        truncateDecimals(scmUsed / (double) capacity);
-    return scmUsedRatio;
-  }
-  /**
-   * Results possible from processing a Node report by
-   * Node2ContainerMapper.
-   */
-  public enum ReportStatus {
-    ALL_IS_WELL,
-    DATANODE_OUT_OF_SPACE,
-    STORAGE_OUT_OF_SPACE,
-    FAILED_STORAGE,
-    FAILED_AND_OUT_OF_SPACE_STORAGE
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
deleted file mode 100644
index 48939f1..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-/**
- * Handles Stale node event.
- */
-public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
-
-  private final PipelineSelector pipelineSelector;
-
-  public StaleNodeHandler(PipelineSelector pipelineSelector) {
-    this.pipelineSelector = pipelineSelector;
-  }
-
-  @Override
-  public void onMessage(DatanodeDetails datanodeDetails,
-                        EventPublisher publisher) {
-    pipelineSelector.handleStaleNode(datanodeDetails);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
deleted file mode 100644
index 0b63ceb..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
+++ /dev/null
@@ -1,87 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node;
-
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
-
-import java.util.Set;
-
-/**
- * A Container Report gets processsed by the Node2Container and returns the
- * Report Result class.
- */
-public class StorageReportResult {
-  private SCMNodeStorageStatMap.ReportStatus status;
-  private Set<StorageLocationReport> fullVolumes;
-  private Set<StorageLocationReport> failedVolumes;
-
-  StorageReportResult(SCMNodeStorageStatMap.ReportStatus status,
-      Set<StorageLocationReport> fullVolumes,
-      Set<StorageLocationReport> failedVolumes) {
-    this.status = status;
-    this.fullVolumes = fullVolumes;
-    this.failedVolumes = failedVolumes;
-  }
-
-  public SCMNodeStorageStatMap.ReportStatus getStatus() {
-    return status;
-  }
-
-  public Set<StorageLocationReport> getFullVolumes() {
-    return fullVolumes;
-  }
-
-  public Set<StorageLocationReport> getFailedVolumes() {
-    return failedVolumes;
-  }
-
-  static class ReportResultBuilder {
-    private SCMNodeStorageStatMap.ReportStatus status;
-    private Set<StorageLocationReport> fullVolumes;
-    private Set<StorageLocationReport> failedVolumes;
-
-    static ReportResultBuilder newBuilder() {
-      return new ReportResultBuilder();
-    }
-
-    public ReportResultBuilder setStatus(
-        SCMNodeStorageStatMap.ReportStatus newstatus) {
-      this.status = newstatus;
-      return this;
-    }
-
-    public ReportResultBuilder setFullVolumeSet(
-        Set<StorageLocationReport> fullVolumesSet) {
-      this.fullVolumes = fullVolumesSet;
-      return this;
-    }
-
-    public ReportResultBuilder setFailedVolumeSet(
-        Set<StorageLocationReport> failedVolumesSet) {
-      this.failedVolumes = failedVolumesSet;
-      return this;
-    }
-
-    StorageReportResult build() {
-      return new StorageReportResult(status, fullVolumes, failedVolumes);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
deleted file mode 100644
index d6a8ad0..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.node;
-
-/**
- * The node package deals with node management.
- * <p/>
- * The node manager takes care of node registrations, removal of node and
- * handling of heartbeats.
- * <p/>
- * The node manager maintains statistics that gets send as part of
- * heartbeats.
- * <p/>
- * The container manager polls the node manager to learn the state of
- * datanodes  that it is interested in.
- * <p/>
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
deleted file mode 100644
index 9625f81..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .NO_SUCH_DATANODE;
-
-/**
- * This data structure maintains the list of containers that is on a datanode.
- * This information is built from the DN container reports.
- */
-public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> {
-
-  /**
-   * Constructs a Node2ContainerMap Object.
-   */
-  public Node2ContainerMap() {
-    super();
-  }
-
-  /**
-   * Returns null if there no containers associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of containers or Null.
-   */
-  public Set<ContainerID> getContainers(UUID datanode) {
-    return getObjects(datanode);
-  }
-
-  /**
-   * Insert a new datanode into Node2Container Map.
-   *
-   * @param datanodeID   -- Datanode UUID
-   * @param containerIDs - List of ContainerIDs.
-   */
-  public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
-      throws SCMException {
-    super.insertNewDatanode(datanodeID, containerIDs);
-  }
-
-  /**
-   * Updates the Container list of an existing DN.
-   *
-   * @param datanodeID - UUID of DN.
-   * @param containers - Set of Containers tht is present on DN.
-   * @throws SCMException - if we don't know about this datanode, for new DN
-   *                        use addDatanodeInContainerMap.
-   */
-  public void setContainersForDatanode(UUID datanodeID,
-      Set<ContainerID> containers) throws SCMException {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(containers);
-    if (dn2ObjectMap
-        .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
-        == null) {
-      throw new SCMException("No such datanode", NO_SUCH_DATANODE);
-    }
-  }
-
-  @VisibleForTesting
-  public int size() {
-    return dn2ObjectMap.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
deleted file mode 100644
index e49a79c..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-
-import java.util.UUID;
-import java.util.Set;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.HashSet;
-import java.util.Collections;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
-
-/**
- * This data structure maintains the list of containers that is on a datanode.
- * This information is built from the DN container reports.
- */
-public class Node2ObjectsMap<T> {
-  protected final Map<UUID, Set<T>> dn2ObjectMap;
-
-  /**
-   * Constructs a Node2ContainerMap Object.
-   */
-  public Node2ObjectsMap() {
-    dn2ObjectMap = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns true if this a datanode that is already tracked by
-   * Node2ContainerMap.
-   *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
-   */
-  public boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return dn2ObjectMap.containsKey(datanodeID);
-  }
-
-  /**
-   * Insert a new datanode into Node2Container Map.
-   *
-   * @param datanodeID   -- Datanode UUID
-   * @param containerIDs - List of ContainerIDs.
-   */
-  public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
-      throws SCMException {
-    Preconditions.checkNotNull(containerIDs);
-    Preconditions.checkNotNull(datanodeID);
-    if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
-        != null) {
-      throw new SCMException("Node already exists in the map",
-          DUPLICATE_DATANODE);
-    }
-  }
-
-  /**
-   * Removes datanode Entry from the map.
-   *
-   * @param datanodeID - Datanode ID.
-   */
-  void removeDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
-  }
-
-  /**
-   * Returns null if there no containers associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of containers or Null.
-   */
-  Set<T> getObjects(UUID datanode) {
-    Preconditions.checkNotNull(datanode);
-    final Set<T> s = dn2ObjectMap.get(datanode);
-    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
-  }
-
-  public ReportResult.ReportResultBuilder<T> newBuilder() {
-    return new ReportResult.ReportResultBuilder<>();
-  }
-
-  public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(objects);
-
-    if (!isKnownDatanode(datanodeID)) {
-      return newBuilder()
-          .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND)
-          .setNewEntries(objects)
-          .build();
-    }
-
-    // Conditions like Zero length containers should be handled by removeAll.
-    Set<T> currentSet = dn2ObjectMap.get(datanodeID);
-    TreeSet<T> newObjects = new TreeSet<>(objects);
-    newObjects.removeAll(currentSet);
-
-    TreeSet<T> missingObjects = new TreeSet<>(currentSet);
-    missingObjects.removeAll(objects);
-
-    if (newObjects.isEmpty() && missingObjects.isEmpty()) {
-      return newBuilder()
-          .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
-          .build();
-    }
-
-    if (newObjects.isEmpty() && !missingObjects.isEmpty()) {
-      return newBuilder()
-          .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES)
-          .setMissingEntries(missingObjects)
-          .build();
-    }
-
-    if (!newObjects.isEmpty() && missingObjects.isEmpty()) {
-      return newBuilder()
-          .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND)
-          .setNewEntries(newObjects)
-          .build();
-    }
-
-    if (!newObjects.isEmpty() && !missingObjects.isEmpty()) {
-      return newBuilder()
-          .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND)
-          .setNewEntries(newObjects)
-          .setMissingEntries(missingObjects)
-          .build();
-    }
-
-    // default status & Make compiler happy
-    return newBuilder()
-        .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
-        .build();
-  }
-
-  @VisibleForTesting
-  public int size() {
-    return dn2ObjectMap.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
deleted file mode 100644
index 87f2222..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * This data structure maintains the list of pipelines which the given 
datanode is a part of. This
- * information will be added whenever a new pipeline allocation happens.
- *
- * <p>TODO: this information needs to be regenerated from pipeline reports on 
SCM restart
- */
-public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
-
-  /** Constructs a Node2PipelineMap Object. */
-  public Node2PipelineMap() {
-    super();
-  }
-
-  /**
-   * Returns null if there no pipelines associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of pipelines or Null.
-   */
-  public Set<PipelineID> getPipelines(UUID datanode) {
-    return getObjects(datanode);
-  }
-
-  /**
-   * Adds a pipeline entry to a given dataNode in the map.
-   *
-   * @param pipeline Pipeline to be added
-   */
-  public synchronized void addPipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
-      UUID dnId = details.getUuid();
-      dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
-          .add(pipeline.getId());
-    }
-  }
-
-  public synchronized void removePipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
-      UUID dnId = details.getUuid();
-      dn2ObjectMap.computeIfPresent(dnId,
-          (k, v) -> {
-            v.remove(pipeline.getId());
-            return v;
-          });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
deleted file mode 100644
index aa5c382..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-/**
- * This exception represents that there is already a node added to NodeStateMap
- * with same UUID.
- */
-public class NodeAlreadyExistsException extends NodeException {
-
-  /**
-   * Constructs an {@code NodeAlreadyExistsException} with {@code null}
-   * as its error detail message.
-   */
-  public NodeAlreadyExistsException() {
-    super();
-  }
-
-  /**
-   * Constructs an {@code NodeAlreadyExistsException} with the specified
-   * detail message.
-   *
-   * @param message
-   *        The detail message (which is saved for later retrieval
-   *        by the {@link #getMessage()} method)
-   */
-  public NodeAlreadyExistsException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
deleted file mode 100644
index c67b55d..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-/**
- * This exception represents all node related exceptions in NodeStateMap.
- */
-public class NodeException extends Exception {
-
-  /**
-   * Constructs an {@code NodeException} with {@code null}
-   * as its error detail message.
-   */
-  public NodeException() {
-    super();
-  }
-
-  /**
-   * Constructs an {@code NodeException} with the specified
-   * detail message.
-   *
-   * @param message
-   *        The detail message (which is saved for later retrieval
-   *        by the {@link #getMessage()} method)
-   */
-  public NodeException(String message) {
-    super(message);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to