Repository: hadoop
Updated Branches:
  refs/heads/trunk 81072d5e3 -> a39296260


HDDS-561. Move Node2ContainerMap and Node2PipelineMap to NodeManager. 
Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3929626
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3929626
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3929626

Branch: refs/heads/trunk
Commit: a39296260f8c77f3808e27b43c623a0edebe4a17
Parents: 81072d5
Author: Nanda kumar <na...@apache.org>
Authored: Tue Oct 2 19:47:15 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Tue Oct 2 19:47:44 2018 +0530

----------------------------------------------------------------------
 .../scm/container/ContainerReportHandler.java   | 16 ++--
 .../hadoop/hdds/scm/node/DeadNodeHandler.java   | 11 +--
 .../hadoop/hdds/scm/node/NewNodeHandler.java    |  9 +-
 .../hadoop/hdds/scm/node/NodeManager.java       | 61 ++++++++++++++
 .../hadoop/hdds/scm/node/NodeStateManager.java  | 88 +++++++++++++++++++-
 .../hadoop/hdds/scm/node/SCMNodeManager.java    | 83 ++++++++++++++++++
 .../hdds/scm/node/SCMNodeStorageStatMap.java    |  2 +-
 .../hadoop/hdds/scm/node/StaleNodeHandler.java  |  9 +-
 .../hdds/scm/node/states/Node2ContainerMap.java |  2 +-
 .../hdds/scm/node/states/Node2PipelineMap.java  | 75 +++++++++++++++++
 .../hdds/scm/pipelines/Node2PipelineMap.java    | 75 -----------------
 .../hdds/scm/pipelines/PipelineSelector.java    | 18 ++--
 .../scm/server/StorageContainerManager.java     | 14 ++--
 .../hdds/scm/container/MockNodeManager.java     | 87 +++++++++++++++++++
 .../container/TestContainerReportHandler.java   | 14 ++--
 .../hdds/scm/node/TestDeadNodeHandler.java      | 27 ++----
 .../testutils/ReplicationNodeManagerMock.java   | 82 ++++++++++++++++++
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |  2 +-
 19 files changed, 520 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 3f156de..71935f0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -29,7 +29,7 @@ import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatu
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -48,7 +48,7 @@ public class ContainerReportHandler implements
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
-  private final Node2ContainerMap node2ContainerMap;
+  private final NodeManager nodeManager;
 
   private final Mapping containerMapping;
 
@@ -57,14 +57,14 @@ public class ContainerReportHandler implements
   private ReplicationActivityStatus replicationStatus;
 
   public ContainerReportHandler(Mapping containerMapping,
-      Node2ContainerMap node2ContainerMap,
+      NodeManager nodeManager,
       ReplicationActivityStatus replicationActivityStatus) {
     Preconditions.checkNotNull(containerMapping);
-    Preconditions.checkNotNull(node2ContainerMap);
+    Preconditions.checkNotNull(nodeManager);
     Preconditions.checkNotNull(replicationActivityStatus);
     this.containerStateManager = containerMapping.getStateManager();
+    this.nodeManager = nodeManager;
     this.containerMapping = containerMapping;
-    this.node2ContainerMap = node2ContainerMap;
     this.replicationStatus = replicationActivityStatus;
   }
 
@@ -89,11 +89,11 @@ public class ContainerReportHandler implements
           .map(ContainerID::new)
           .collect(Collectors.toSet());
 
-      ReportResult<ContainerID> reportResult = node2ContainerMap
-          .processReport(datanodeOrigin.getUuid(), containerIds);
+      ReportResult<ContainerID> reportResult = nodeManager
+          .processContainerReport(datanodeOrigin.getUuid(), containerIds);
 
       //we have the report, so we can update the states for the next iteration.
-      node2ContainerMap
+      nodeManager
           .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
 
       for (ContainerID containerID : reportResult.getMissingEntries()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 253b3ec..17edf9e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -26,7 +26,6 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final Node2ContainerMap node2ContainerMap;
-
   private final ContainerStateManager containerStateManager;
 
   private final NodeManager nodeManager;
@@ -47,10 +44,8 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
   private static final Logger LOG =
       LoggerFactory.getLogger(DeadNodeHandler.class);
 
-  public DeadNodeHandler(
-      Node2ContainerMap node2ContainerMap,
-      ContainerStateManager containerStateManager, NodeManager nodeManager) {
-    this.node2ContainerMap = node2ContainerMap;
+  public DeadNodeHandler(NodeManager nodeManager,
+      ContainerStateManager containerStateManager) {
     this.containerStateManager = containerStateManager;
     this.nodeManager = nodeManager;
   }
@@ -61,7 +56,7 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
     nodeManager.processDeadNode(datanodeDetails.getUuid());
 
     Set<ContainerID> containers =
-        node2ContainerMap.getContainers(datanodeDetails.getUuid());
+        nodeManager.getContainers(datanodeDetails.getUuid());
     if (containers == null) {
       LOG.info("There's no containers in dead datanode {}, no replica will be"
           + " removed from the in-memory state.", datanodeDetails.getUuid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index 79b75a5..780aa2b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -31,17 +30,17 @@ import java.util.Collections;
  */
 public class NewNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final Node2ContainerMap node2ContainerMap;
+  private final NodeManager nodeManager;
 
-  public NewNodeHandler(Node2ContainerMap node2ContainerMap) {
-    this.node2ContainerMap = node2ContainerMap;
+  public NewNodeHandler(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
     try {
-      node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
+      nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(),
           Collections.emptySet());
     } catch (SCMException e) {
       // TODO: log exception message.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index fad3ee3..0dc1a0c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -18,11 +18,16 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -31,6 +36,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -134,6 +140,61 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
   NodeState getNodeState(DatanodeDetails datanodeDetails);
 
   /**
+   * Get set of pipelines a datanode is part of.
+   * @param dnId - datanodeID
+   * @return Set of PipelineID
+   */
+  Set<PipelineID> getPipelineByDnID(UUID dnId);
+
+  /**
+   * Add pipeline information in the NodeManager.
+   * @param pipeline - Pipeline to be added
+   */
+  void addPipeline(Pipeline pipeline);
+
+  /**
+   * Remove a pipeline information from the NodeManager.
+   * @param pipeline - Pipeline to be removed
+   */
+  void removePipeline(Pipeline pipeline);
+
+  /**
+   * Update set of containers available on a datanode.
+   * @param uuid - DatanodeID
+   * @param containerIds - Set of containerIDs
+   * @throws SCMException - if datanode is not known. For new datanode use
+   *                        addDatanodeInContainerMap call.
+   */
+  void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
+      throws SCMException;
+
+  /**
+   * Process containerReport received from datanode.
+   * @param uuid - DataonodeID
+   * @param containerIds - Set of containerIDs
+   * @return The result after processing containerReport
+   */
+  ReportResult<ContainerID> processContainerReport(UUID uuid,
+      Set<ContainerID> containerIds);
+
+  /**
+   * Return set of containerIDs available on a datanode.
+   * @param uuid - DatanodeID
+   * @return - set of containerIDs
+   */
+  Set<ContainerID> getContainers(UUID uuid);
+
+  /**
+   * Insert a new datanode with set of containerIDs for containers available
+   * on it.
+   * @param uuid - DatanodeID
+   * @param containerIDs - Set of ContainerIDs
+   * @throws SCMException - if datanode already exists
+   */
+  void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs)
+      throws SCMException;
+
+  /**
    * Add a {@link SCMCommand} to the command queue, which are
    * handled by HB thread asynchronously.
    * @param dnId datanode uuid

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 1f99ffe..88f984b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -24,11 +24,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.node.states.NodeStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.states.*;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.common.statemachine
@@ -87,6 +90,15 @@ public class NodeStateManager implements Runnable, Closeable 
{
    */
   private final NodeStateMap nodeStateMap;
   /**
+   * Maintains the mapping from node to pipelines a node is part of.
+   */
+  private final Node2PipelineMap node2PipelineMap;
+  /**
+   * Maintains the map from node to ContainerIDs for the containers
+   * available on the node.
+   */
+  private final Node2ContainerMap node2ContainerMap;
+  /**
    * Used for publishing node state change events.
    */
   private final EventPublisher eventPublisher;
@@ -118,6 +130,8 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
     this.nodeStateMap = new NodeStateMap();
+    this.node2PipelineMap = new Node2PipelineMap();
+    this.node2ContainerMap = new Node2ContainerMap();
     this.eventPublisher = eventPublisher;
     this.state2EventMap = new HashMap<>();
     initialiseState2EventMap();
@@ -243,6 +257,14 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
+   * Adds a pipeline in the node2PipelineMap.
+   * @param pipeline - Pipeline to be added
+   */
+  public void addPipeline(Pipeline pipeline) {
+    node2PipelineMap.addPipeline(pipeline);
+  }
+
+  /**
    * Get information about the node.
    *
    * @param datanodeDetails DatanodeDetails
@@ -353,6 +375,15 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
+   * Gets set of pipelineID a datanode belongs to.
+   * @param dnId - Datanode ID
+   * @return Set of PipelineID
+   */
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
+    return node2PipelineMap.getPipelines(dnId);
+  }
+
+  /**
    * Returns the count of healthy nodes.
    *
    * @return healthy node count
@@ -457,6 +488,57 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
+   * Removes a pipeline from the node2PipelineMap.
+   * @param pipeline - Pipeline to be removed
+   */
+  public void removePipeline(Pipeline pipeline) {
+    node2PipelineMap.removePipeline(pipeline);
+  }
+  /**
+   * Update set of containers available on a datanode.
+   * @param uuid - DatanodeID
+   * @param containerIds - Set of containerIDs
+   * @throws SCMException - if datanode is not known. For new datanode use
+   *                        addDatanodeInContainerMap call.
+   */
+  public void setContainersForDatanode(UUID uuid, Set<ContainerID> 
containerIds)
+      throws SCMException {
+    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
+  }
+
+  /**
+   * Process containerReport received from datanode.
+   * @param uuid - DataonodeID
+   * @param containerIds - Set of containerIDs
+   * @return The result after processing containerReport
+   */
+  public ReportResult<ContainerID> processContainerReport(UUID uuid,
+      Set<ContainerID> containerIds) {
+    return node2ContainerMap.processReport(uuid, containerIds);
+  }
+
+  /**
+   * Return set of containerIDs available on a datanode.
+   * @param uuid - DatanodeID
+   * @return - set of containerIDs
+   */
+  public Set<ContainerID> getContainers(UUID uuid) {
+    return node2ContainerMap.getContainers(uuid);
+  }
+
+  /**
+   * Insert a new datanode with set of containerIDs for containers available
+   * on it.
+   * @param uuid - DatanodeID
+   * @param containerIDs - Set of ContainerIDs
+   * @throws SCMException - if datanode already exists
+   */
+  public void addDatanodeInContainerMap(UUID uuid,
+      Set<ContainerID> containerIDs) throws SCMException {
+    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
+  }
+
+  /**
    * Move Stale or Dead node to healthy if we got a heartbeat from them.
    * Move healthy nodes to stale nodes if it is needed.
    * Move Stales node to dead if needed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 620c816..36a6f15 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -21,8 +21,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -59,6 +64,7 @@ import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -471,6 +477,83 @@ public class SCMNodeManager
     return nodeCountMap;
   }
 
+  /**
+   * Get set of pipelines a datanode is part of.
+   * @param dnId - datanodeID
+   * @return Set of PipelineID
+   */
+  @Override
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
+    return nodeStateManager.getPipelineByDnID(dnId);
+  }
+
+
+  /**
+   * Add pipeline information in the NodeManager.
+   * @param pipeline - Pipeline to be added
+   */
+  @Override
+  public void addPipeline(Pipeline pipeline) {
+    nodeStateManager.addPipeline(pipeline);
+  }
+
+  /**
+   * Remove a pipeline information from the NodeManager.
+   * @param pipeline - Pipeline to be removed
+   */
+  @Override
+  public void removePipeline(Pipeline pipeline) {
+    nodeStateManager.removePipeline(pipeline);
+  }
+
+  /**
+   * Update set of containers available on a datanode.
+   * @param uuid - DatanodeID
+   * @param containerIds - Set of containerIDs
+   * @throws SCMException - if datanode is not known. For new datanode use
+   *                        addDatanodeInContainerMap call.
+   */
+  @Override
+  public void setContainersForDatanode(UUID uuid,
+      Set<ContainerID> containerIds) throws SCMException {
+    nodeStateManager.setContainersForDatanode(uuid, containerIds);
+  }
+
+  /**
+   * Process containerReport received from datanode.
+   * @param uuid - DataonodeID
+   * @param containerIds - Set of containerIDs
+   * @return The result after processing containerReport
+   */
+  @Override
+  public ReportResult<ContainerID> processContainerReport(UUID uuid,
+      Set<ContainerID> containerIds) {
+    return nodeStateManager.processContainerReport(uuid, containerIds);
+  }
+
+  /**
+   * Return set of containerIDs available on a datanode.
+   * @param uuid - DatanodeID
+   * @return - set of containerIDs
+   */
+  @Override
+  public Set<ContainerID> getContainers(UUID uuid) {
+    return nodeStateManager.getContainers(uuid);
+  }
+
+  /**
+   * Insert a new datanode with set of containerIDs for containers available
+   * on it.
+   * @param uuid - DatanodeID
+   * @param containerIDs - Set of ContainerIDs
+   * @throws SCMException - if datanode already exists
+   */
+  @Override
+  public void addDatanodeInContainerMap(UUID uuid,
+      Set<ContainerID> containerIDs) throws SCMException {
+    nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
+  }
+
   // TODO:
   // Since datanode commands are added through event queue, onMessage method
   // should take care of adding commands to command queue.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
index 6ea83df..1b0e5b5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
@@ -147,7 +147,7 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
    * @param datanodeID - UUID of DN.
    * @param report - set of Storage Reports for the Datanode.
    * @throws SCMException - if we don't know about this datanode, for new DN
-   *                      use insertNewDatanode.
+   *                        use addDatanodeInContainerMap.
    */
   public void updateDatanodeMap(UUID datanodeID,
       Set<StorageLocationReport> report) throws SCMException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index ddbba82..48939f1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,25 +19,18 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Handles Stale node event.
  */
 public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
-  static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class);
 
-  private final Node2ContainerMap node2ContainerMap;
   private final PipelineSelector pipelineSelector;
 
-  public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
-      PipelineSelector pipelineSelector) {
-    this.node2ContainerMap = node2ContainerMap;
+  public StaleNodeHandler(PipelineSelector pipelineSelector) {
     this.pipelineSelector = pipelineSelector;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
index 549080a..9625f81 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
@@ -70,7 +70,7 @@ public class Node2ContainerMap extends 
Node2ObjectsMap<ContainerID> {
    * @param datanodeID - UUID of DN.
    * @param containers - Set of Containers tht is present on DN.
    * @throws SCMException - if we don't know about this datanode, for new DN
-   *                      use insertNewDatanode.
+   *                        use addDatanodeInContainerMap.
    */
   public void setContainersForDatanode(UUID datanodeID,
       Set<ContainerID> containers) throws SCMException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
new file mode 100644
index 0000000..87f2222
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * This data structure maintains the list of pipelines which the given 
datanode is a part of. This
+ * information will be added whenever a new pipeline allocation happens.
+ *
+ * <p>TODO: this information needs to be regenerated from pipeline reports on 
SCM restart
+ */
+public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
+
+  /** Constructs a Node2PipelineMap Object. */
+  public Node2PipelineMap() {
+    super();
+  }
+
+  /**
+   * Returns null if there no pipelines associated with this datanode ID.
+   *
+   * @param datanode - UUID
+   * @return Set of pipelines or Null.
+   */
+  public Set<PipelineID> getPipelines(UUID datanode) {
+    return getObjects(datanode);
+  }
+
+  /**
+   * Adds a pipeline entry to a given dataNode in the map.
+   *
+   * @param pipeline Pipeline to be added
+   */
+  public synchronized void addPipeline(Pipeline pipeline) {
+    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+      UUID dnId = details.getUuid();
+      dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
+          .add(pipeline.getId());
+    }
+  }
+
+  public synchronized void removePipeline(Pipeline pipeline) {
+    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+      UUID dnId = details.getUuid();
+      dn2ObjectMap.computeIfPresent(dnId,
+          (k, v) -> {
+            v.remove(pipeline.getId());
+            return v;
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
deleted file mode 100644
index 87f2222..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * This data structure maintains the list of pipelines which the given 
datanode is a part of. This
- * information will be added whenever a new pipeline allocation happens.
- *
- * <p>TODO: this information needs to be regenerated from pipeline reports on 
SCM restart
- */
-public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
-
-  /** Constructs a Node2PipelineMap Object. */
-  public Node2PipelineMap() {
-    super();
-  }
-
-  /**
-   * Returns null if there no pipelines associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of pipelines or Null.
-   */
-  public Set<PipelineID> getPipelines(UUID datanode) {
-    return getObjects(datanode);
-  }
-
-  /**
-   * Adds a pipeline entry to a given dataNode in the map.
-   *
-   * @param pipeline Pipeline to be added
-   */
-  public synchronized void addPipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
-      UUID dnId = details.getUuid();
-      dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
-          .add(pipeline.getId());
-    }
-  }
-
-  public synchronized void removePipeline(Pipeline pipeline) {
-    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
-      UUID dnId = details.getUuid();
-      dn2ObjectMap.computeIfPresent(dnId,
-          (k, v) -> {
-            v.remove(pipeline.getId());
-            return v;
-          });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 59d937e..c8d22ff 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -33,7 +33,6 @@ import 
org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -61,7 +60,6 @@ import java.util.HashMap;
 import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
@@ -85,7 +83,7 @@ public class PipelineSelector {
   private final long containerSize;
   private final MetadataStore pipelineStore;
   private final PipelineStateManager stateManager;
-  private final Node2PipelineMap node2PipelineMap;
+  private final NodeManager nodeManager;
   private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
   private final Map<PipelineID, Pipeline> pipelineMap;
   private final LeaseManager<Pipeline> pipelineLeaseManager;
@@ -105,7 +103,6 @@ public class PipelineSelector {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
-    node2PipelineMap = new Node2PipelineMap();
     pipelineMap = new ConcurrentHashMap<>();
     pipelineManagerMap = new HashMap<>();
 
@@ -124,6 +121,7 @@ public class PipelineSelector {
     pipelineLeaseManager.start();
 
     stateManager = new PipelineStateManager();
+    this.nodeManager = nodeManager;
     pipeline2ContainerMap = new HashMap<>();
 
     // Write the container name to pipeline mapping.
@@ -361,10 +359,6 @@ public class PipelineSelector {
     }
   }
 
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return node2PipelineMap.getPipelines(dnId);
-  }
-
   private void addExistingPipeline(Pipeline pipeline) throws IOException {
     LifeCycleState state = pipeline.getLifeCycleState();
     switch (state) {
@@ -379,7 +373,7 @@ public class PipelineSelector {
       // when all the nodes have reported.
       pipelineMap.put(pipeline.getId(), pipeline);
       pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-      node2PipelineMap.addPipeline(pipeline);
+      nodeManager.addPipeline(pipeline);
       // reset the datanodes in the pipeline
       // they will be reset on
       pipeline.resetPipeline();
@@ -393,7 +387,7 @@ public class PipelineSelector {
   }
 
   public void handleStaleNode(DatanodeDetails dn) {
-    Set<PipelineID> pipelineIDs = getPipelineByDnID(dn.getUuid());
+    Set<PipelineID> pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid());
     for (PipelineID id : pipelineIDs) {
       LOG.info("closing pipeline {}.", id);
       eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
@@ -436,7 +430,7 @@ public class PipelineSelector {
       case CREATE:
         pipelineMap.put(pipeline.getId(), pipeline);
         pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-        node2PipelineMap.addPipeline(pipeline);
+        nodeManager.addPipeline(pipeline);
         // Acquire lease on pipeline
         Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
         // Register callback to be executed in case of timeout
@@ -459,7 +453,7 @@ public class PipelineSelector {
       case TIMEOUT:
         closePipeline(pipeline);
         pipeline2ContainerMap.remove(pipeline.getId());
-        node2PipelineMap.removePipeline(pipeline);
+        nodeManager.removePipeline(pipeline);
         pipelineMap.remove(pipeline.getId());
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index bb72075..a6a967c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
@@ -212,8 +211,6 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     scmBlockManager = new BlockManagerImpl(
         conf, getScmNodeManager(), scmContainerManager, eventQueue);
 
-    Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
-
     replicationStatus = new ReplicationActivityStatus();
 
     CloseContainerEventHandler closeContainerHandler =
@@ -226,18 +223,17 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
-    NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
+    NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager);
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(node2ContainerMap,
-                scmContainerManager.getPipelineSelector());
-    DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
-        getScmContainerManager().getStateManager(), scmNodeManager);
+        new StaleNodeHandler(scmContainerManager.getPipelineSelector());
+    DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
+        getScmContainerManager().getStateManager());
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
     PendingDeleteHandler pendingDeleteHandler =
         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
 
     ContainerReportHandler containerReportHandler =
-        new ContainerReportHandler(scmContainerManager, node2ContainerMap,
+        new ContainerReportHandler(scmContainerManager, scmNodeManager,
             replicationStatus);
     scmChillModeManager = new SCMChillModeManager(conf,
         getScmContainerManager().getStateManager().getAllContainers(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 0d90728..3221053 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -19,9 +19,14 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -29,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
@@ -42,6 +48,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
@@ -73,12 +80,16 @@ public class MockNodeManager implements NodeManager {
   private final SCMNodeStat aggregateStat;
   private boolean chillmode;
   private final Map<UUID, List<SCMCommand>> commandMap;
+  private final Node2PipelineMap node2PipelineMap;
+  private final Node2ContainerMap node2ContainerMap;
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
     this.staleNodes = new LinkedList<>();
     this.deadNodes = new LinkedList<>();
     this.nodeMetricMap = new HashMap<>();
+    this.node2PipelineMap = new Node2PipelineMap();
+    this.node2ContainerMap = new Node2ContainerMap();
     aggregateStat = new SCMNodeStat();
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
@@ -289,6 +300,34 @@ public class MockNodeManager implements NodeManager {
     return null;
   }
 
+  /**
+   * Get set of pipelines a datanode is part of.
+   * @param dnId - datanodeID
+   * @return Set of PipelineID
+   */
+  @Override
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
+    return node2PipelineMap.getPipelines(dnId);
+  }
+
+  /**
+   * Add pipeline information in the NodeManager.
+   * @param pipeline - Pipeline to be added
+   */
+  @Override
+  public void addPipeline(Pipeline pipeline) {
+    node2PipelineMap.addPipeline(pipeline);
+  }
+
+  /**
+   * Remove a pipeline information from the NodeManager.
+   * @param pipeline - Pipeline to be removed
+   */
+  @Override
+  public void removePipeline(Pipeline pipeline) {
+    node2PipelineMap.removePipeline(pipeline);
+  }
+
   @Override
   public void addDatanodeCommand(UUID dnId, SCMCommand command) {
     if(commandMap.containsKey(dnId)) {
@@ -313,6 +352,54 @@ public class MockNodeManager implements NodeManager {
     // do nothing
   }
 
+  /**
+   * Update set of containers available on a datanode.
+   * @param uuid - DatanodeID
+   * @param containerIds - Set of containerIDs
+   * @throws SCMException - if datanode is not known. For new datanode use
+   *                        addDatanodeInContainerMap call.
+   */
+  @Override
+  public void setContainersForDatanode(UUID uuid, Set<ContainerID> 
containerIds)
+      throws SCMException {
+    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
+  }
+
+  /**
+   * Process containerReport received from datanode.
+   * @param uuid - DataonodeID
+   * @param containerIds - Set of containerIDs
+   * @return The result after processing containerReport
+   */
+  @Override
+  public ReportResult<ContainerID> processContainerReport(UUID uuid,
+      Set<ContainerID> containerIds) {
+    return node2ContainerMap.processReport(uuid, containerIds);
+  }
+
+  /**
+   * Return set of containerIDs available on a datanode.
+   * @param uuid - DatanodeID
+   * @return - set of containerIDs
+   */
+  @Override
+  public Set<ContainerID> getContainers(UUID uuid) {
+    return node2ContainerMap.getContainers(uuid);
+  }
+
+  /**
+   * Insert a new datanode with set of containerIDs for containers available
+   * on it.
+   * @param uuid - DatanodeID
+   * @param containerIDs - Set of ContainerIDs
+   * @throws SCMException - if datanode already exists
+   */
+  @Override
+  public void addDatanodeInContainerMap(UUID uuid,
+      Set<ContainerID> containerIDs) throws SCMException {
+    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
+  }
+
   // Returns the number of commands that is queued to this node manager.
   public int getCommandCount(DatanodeDetails dd) {
     List<SCMCommand> list = commandMap.get(dd.getUuid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index a59179b..f79ae1e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -37,7 +37,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.replication
     .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
 public class TestContainerReportHandler implements EventPublisher {
 
   private List<Object> publishedEvents = new ArrayList<>();
+  private final NodeManager nodeManager = new MockNodeManager(true, 1);
 
   private static final Logger LOG =
       LoggerFactory.getLogger(TestContainerReportHandler.class);
@@ -73,7 +74,6 @@ public class TestContainerReportHandler implements 
EventPublisher {
   public void test() throws IOException {
     //GIVEN
     OzoneConfiguration conf = new OzoneConfiguration();
-    Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
     Mapping mapping = Mockito.mock(Mapping.class);
     PipelineSelector selector = Mockito.mock(PipelineSelector.class);
 
@@ -96,17 +96,17 @@ public class TestContainerReportHandler implements 
EventPublisher {
         new ReplicationActivityStatus();
 
     ContainerReportHandler reportHandler =
-        new ContainerReportHandler(mapping, node2ContainerMap,
+        new ContainerReportHandler(mapping, nodeManager,
             replicationActivityStatus);
 
     DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
-    node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>());
-    node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>());
-    node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>());
-    node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>());
+    nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>());
+    nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>());
+    nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>());
+    nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>());
     PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
 
     Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 6966322..7bba032 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -39,7 +39,6 @@ import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -60,7 +59,6 @@ public class TestDeadNodeHandler {
 
   private List<ReplicationRequest> sentEvents = new ArrayList<>();
   private SCMNodeManager nodeManager;
-  private Node2ContainerMap node2ContainerMap;
   private ContainerStateManager containerStateManager;
   private NodeReportHandler nodeReportHandler;
   private DeadNodeHandler deadNodeHandler;
@@ -70,14 +68,13 @@ public class TestDeadNodeHandler {
   @Before
   public void setup() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
-    node2ContainerMap = new Node2ContainerMap();
     containerStateManager = new ContainerStateManager(conf,
         Mockito.mock(Mapping.class),
         Mockito.mock(PipelineSelector.class));
     eventQueue = new EventQueue();
     nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue);
-    deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
-        containerStateManager, nodeManager);
+    deadNodeHandler = new DeadNodeHandler(nodeManager,
+        containerStateManager);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     publisher = Mockito.mock(EventPublisher.class);
     nodeReportHandler = new NodeReportHandler(nodeManager);
@@ -96,8 +93,8 @@ public class TestDeadNodeHandler {
     ContainerInfo container3 =
         TestUtils.allocateContainer(containerStateManager);
 
-    registerReplicas(node2ContainerMap, datanode1, container1, container2);
-    registerReplicas(node2ContainerMap, datanode2, container1, container3);
+    registerReplicas(datanode1, container1, container2);
+    registerReplicas(datanode2, container1, container3);
 
     registerReplicas(containerStateManager, container1, datanode1, datanode2);
     registerReplicas(containerStateManager, container2, datanode1);
@@ -105,13 +102,8 @@ public class TestDeadNodeHandler {
 
     TestUtils.closeContainer(containerStateManager, container1);
 
-    //WHEN datanode1 is dead
     deadNodeHandler.onMessage(datanode1, publisher);
 
-    //THEN
-    //node2ContainerMap has not been changed
-    Assert.assertEquals(2, node2ContainerMap.size());
-
     Set<DatanodeDetails> container1Replicas =
         containerStateManager.getContainerStateMap()
             .getContainerReplicas(new 
ContainerID(container1.getContainerID()));
@@ -168,7 +160,7 @@ public class TestDeadNodeHandler {
 
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerStateManager);
-    registerReplicas(node2ContainerMap, datanode1, container1);
+    registerReplicas(datanode1, container1);
 
     SCMNodeStat stat = nodeManager.getStats();
     Assert.assertTrue(stat.getCapacity().get() == 300);
@@ -211,7 +203,7 @@ public class TestDeadNodeHandler {
 
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerStateManager);
-    registerReplicas(node2ContainerMap, dn1, container1);
+    registerReplicas(dn1, container1);
 
     deadNodeHandler.onMessage(dn1, eventQueue);
     Assert.assertTrue(logCapturer.getOutput().contains(
@@ -226,12 +218,11 @@ public class TestDeadNodeHandler {
             datanodes);
   }
 
-  private void registerReplicas(Node2ContainerMap node2ConMap,
-      DatanodeDetails datanode,
+  private void registerReplicas(DatanodeDetails datanode,
       ContainerInfo... containers)
       throws SCMException {
-    node2ConMap
-        .insertNewDatanode(datanode.getUuid(),
+    nodeManager
+        .addDatanodeInContainerMap(datanode.getUuid(),
             Arrays.stream(containers)
                 .map(container -> new ContainerID(container.getContainerID()))
                 .collect(Collectors.toSet()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a9a00ef..74c3932 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.ozone.container.testutils;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.CommandQueue;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -30,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -39,6 +44,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -217,6 +223,82 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
   }
 
   /**
+   * Get set of pipelines a datanode is part of.
+   * @param dnId - datanodeID
+   * @return Set of PipelineID
+   */
+  @Override
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Add pipeline information in the NodeManager.
+   * @param pipeline - Pipeline to be added
+   */
+  @Override
+  public void addPipeline(Pipeline pipeline) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Remove a pipeline information from the NodeManager.
+   * @param pipeline - Pipeline to be removed
+   */
+  @Override
+  public void removePipeline(Pipeline pipeline) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Update set of containers available on a datanode.
+   * @param uuid - DatanodeID
+   * @param containerIds - Set of containerIDs
+   * @throws SCMException - if datanode is not known. For new datanode use
+   *                        addDatanodeInContainerMap call.
+   */
+  @Override
+  public void setContainersForDatanode(UUID uuid, Set<ContainerID> 
containerIds)
+      throws SCMException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Process containerReport received from datanode.
+   * @param uuid - DataonodeID
+   * @param containerIds - Set of containerIDs
+   * @return The result after processing containerReport
+   */
+  @Override
+  public ReportResult<ContainerID> processContainerReport(UUID uuid,
+      Set<ContainerID> containerIds) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Return set of containerIDs available on a datanode.
+   * @param uuid - DatanodeID
+   * @return - set of containerIDs
+   */
+  @Override
+  public Set<ContainerID> getContainers(UUID uuid) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
+   * Insert a new datanode with set of containerIDs for containers available
+   * on it.
+   * @param uuid - DatanodeID
+   * @param containerIDs - Set of ContainerIDs
+   * @throws SCMException - if datanode already exists
+   */
+  @Override
+  public void addDatanodeInContainerMap(UUID uuid,
+      Set<ContainerID> containerIDs) throws SCMException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
    * Closes this stream and releases any system resources associated
    * with it. If the stream is already closed then invoking this
    * method has no effect.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index ad3798e..e61a909 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -97,7 +97,7 @@ public class TestNode2PipelineMap {
     Assert.assertEquals(3, dns.size());
 
     // get pipeline details by dnid
-    Set<PipelineID> pipelines = mapping.getPipelineSelector()
+    Set<PipelineID> pipelines = scm.getScmNodeManager()
         .getPipelineByDnID(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
     pipelines.forEach(p -> Assert.assertEquals(p,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 5eabfb9..b02eae2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -119,7 +119,7 @@ public class TestPipelineClose {
         HddsProtos.LifeCycleState.CLOSED);
     for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as 
well
-      Assert.assertEquals(pipelineSelector.getPipelineByDnID(
+      Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID(
           dn.getUuid()).size(), 0);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to