http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
new file mode 100644
index 0000000..0174c17
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -0,0 +1,904 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.VersionInfo;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+    .ErrorCode;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .INVALID;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Maintains information about the Datanodes on SCM side.
+ * <p>
+ * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
+ * <p>
+ * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
+ * staleNodesMap to deadNodesMap. This moving of a node from one map to another
+ * is controlled by 4 configuration variables. These variables define how many
+ * heartbeats must go missing for the node to move from one map to another.
+ * <p>
+ * Each heartbeat that SCMNodeManager receives is  put into heartbeatQueue. The
+ * worker thread wakes up and grabs that heartbeat from the queue. The worker
+ * thread will lookup the healthynodes map and set the timestamp if the entry
+ * is there. if not it will look up stale and deadnodes map.
+ * <p>
+ * The getNode(byState) functions make copy of node maps and then creates a 
list
+ * based on that. It should be assumed that these get functions always report
+ * *stale* information. For example, getting the deadNodeCount followed by
+ * getNodes(DEAD) could very well produce totally different count. Also
+ * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
+ * guaranteed to add up to the total nodes that we know off. Please treat all
+ * get functions in this file as a snap-shot of information that is 
inconsistent
+ * as soon as you read it.
+ */
+public class SCMNodeManager
+    implements NodeManager, StorageContainerNodeProtocol {
+
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMNodeManager.class);
+
+  /**
+   * Key = NodeID, value = timestamp.
+   */
+  private final ConcurrentHashMap<UUID, Long> healthyNodes;
+  private final ConcurrentHashMap<UUID, Long> staleNodes;
+  private final ConcurrentHashMap<UUID, Long> deadNodes;
+  private final Queue<HeartbeatQueueItem> heartbeatQueue;
+  private final ConcurrentHashMap<UUID, DatanodeDetails> nodes;
+  // Individual live node stats
+  private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
+  // Aggregated node stats
+  private SCMNodeStat scmStat;
+  // TODO: expose nodeStats and scmStat as metrics
+  private final AtomicInteger healthyNodeCount;
+  private final AtomicInteger staleNodeCount;
+  private final AtomicInteger deadNodeCount;
+  private final AtomicInteger totalNodes;
+  private long staleNodeIntervalMs;
+  private final long deadNodeIntervalMs;
+  private final long heartbeatCheckerIntervalMs;
+  private final long datanodeHBIntervalSeconds;
+  private final ScheduledExecutorService executorService;
+  private long lastHBcheckStart;
+  private long lastHBcheckFinished = 0;
+  private long lastHBProcessedCount;
+  private int chillModeNodeCount;
+  private final int maxHBToProcessPerLoop;
+  private final String clusterID;
+  private final VersionInfo version;
+  /**
+   * During start up of SCM, it will enter into chill mode and will be there
+   * until number of Datanodes registered reaches {@code chillModeNodeCount}.
+   * This flag is for tracking startup chill mode.
+   */
+  private AtomicBoolean inStartupChillMode;
+  /**
+   * Administrator can put SCM into chill mode manually.
+   * This flag is for tracking manual chill mode.
+   */
+  private AtomicBoolean inManualChillMode;
+  private final CommandQueue commandQueue;
+  // Node manager MXBean
+  private ObjectName nmInfoBean;
+
+  // Node pool manager.
+  private final SCMNodePoolManager nodePoolManager;
+  private final StorageContainerManager scmManager;
+
+  /**
+   * Constructs SCM machine Manager.
+   */
+  public SCMNodeManager(OzoneConfiguration conf, String clusterID,
+      StorageContainerManager scmManager) throws IOException {
+    heartbeatQueue = new ConcurrentLinkedQueue<>();
+    healthyNodes = new ConcurrentHashMap<>();
+    deadNodes = new ConcurrentHashMap<>();
+    staleNodes = new ConcurrentHashMap<>();
+    nodes = new ConcurrentHashMap<>();
+    nodeStats = new ConcurrentHashMap<>();
+    scmStat = new SCMNodeStat();
+
+    healthyNodeCount = new AtomicInteger(0);
+    staleNodeCount = new AtomicInteger(0);
+    deadNodeCount = new AtomicInteger(0);
+    totalNodes = new AtomicInteger(0);
+    this.clusterID = clusterID;
+    this.version = VersionInfo.getLatestVersion();
+    commandQueue = new CommandQueue();
+
+    // TODO: Support this value as a Percentage of known machines.
+    chillModeNodeCount = 1;
+
+    staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
+    deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf);
+    heartbeatCheckerIntervalMs =
+        HddsServerUtil.getScmheartbeatCheckerInterval(conf);
+    datanodeHBIntervalSeconds = HddsServerUtil.getScmHeartbeatInterval(conf);
+    maxHBToProcessPerLoop = HddsServerUtil.getMaxHBToProcessPerLoop(conf);
+
+    executorService = HadoopExecutors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
+
+    LOG.info("Entering startup chill mode.");
+    this.inStartupChillMode = new AtomicBoolean(true);
+    this.inManualChillMode = new AtomicBoolean(false);
+
+    Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
+    executorService.schedule(this, heartbeatCheckerIntervalMs,
+        TimeUnit.MILLISECONDS);
+
+    registerMXBean();
+
+    this.nodePoolManager = new SCMNodePoolManager(conf);
+    this.scmManager = scmManager;
+  }
+
+  private void registerMXBean() {
+    this.nmInfoBean = MBeans.register("SCMNodeManager",
+        "SCMNodeManagerInfo", this);
+  }
+
+  private void unregisterMXBean() {
+    if(this.nmInfoBean != null) {
+      MBeans.unregister(this.nmInfoBean);
+      this.nmInfoBean = null;
+    }
+  }
+
+  /**
+   * Removes a data node from the management of this Node Manager.
+   *
+   * @param node - DataNode.
+   * @throws UnregisteredNodeException
+   */
+  @Override
+  public void removeNode(DatanodeDetails node) {
+    // TODO : Fix me when adding the SCM CLI.
+
+  }
+
+  /**
+   * Gets all datanodes that are in a certain state. This function works by
+   * taking a snapshot of the current collection and then returning the list
+   * from that collection. This means that real map might have changed by the
+   * time we return this list.
+   *
+   * @return List of Datanodes that are known to SCM in the requested state.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(NodeState nodestate)
+      throws IllegalArgumentException {
+    Map<UUID, Long> set;
+    switch (nodestate) {
+    case HEALTHY:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(healthyNodes));
+      }
+      break;
+    case STALE:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(staleNodes));
+      }
+      break;
+    case DEAD:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(deadNodes));
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown node state requested.");
+    }
+
+    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns all datanodes that are known to SCM.
+   *
+   * @return List of DatanodeDetails
+   */
+  @Override
+  public List<DatanodeDetails> getAllNodes() {
+    Map<UUID, DatanodeDetails> set;
+    synchronized (this) {
+      set = Collections.unmodifiableMap(new HashMap<>(nodes));
+    }
+    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the minimum number of nodes to get out of Chill mode.
+   *
+   * @return int
+   */
+  @Override
+  public int getMinimumChillModeNodes() {
+    return chillModeNodeCount;
+  }
+
+  /**
+   * Sets the Minimum chill mode nodes count, used only in testing.
+   *
+   * @param count - Number of nodes.
+   */
+  @VisibleForTesting
+  public void setMinimumChillModeNodes(int count) {
+    chillModeNodeCount = count;
+  }
+
+  /**
+   * Returns chill mode Status string.
+   * @return String
+   */
+  @Override
+  public String getChillModeStatus() {
+    if (inStartupChillMode.get()) {
+      return "Still in chill mode, waiting on nodes to report in." +
+          String.format(" %d nodes reported, minimal %d nodes required.",
+              totalNodes.get(), getMinimumChillModeNodes());
+    }
+    if (inManualChillMode.get()) {
+      return "Out of startup chill mode, but in manual chill mode." +
+          String.format(" %d nodes have reported in.", totalNodes.get());
+    }
+    return "Out of chill mode." +
+        String.format(" %d nodes have reported in.", totalNodes.get());
+  }
+
+  /**
+   * Forcefully exits the chill mode even if we have not met the minimum
+   * criteria of exiting the chill mode. This will exit from both startup
+   * and manual chill mode.
+   */
+  @Override
+  public void forceExitChillMode() {
+    if(inStartupChillMode.get()) {
+      LOG.info("Leaving startup chill mode.");
+      inStartupChillMode.set(false);
+    }
+    if(inManualChillMode.get()) {
+      LOG.info("Leaving manual chill mode.");
+      inManualChillMode.set(false);
+    }
+  }
+
+  /**
+   * Puts the node manager into manual chill mode.
+   */
+  @Override
+  public void enterChillMode() {
+    LOG.info("Entering manual chill mode.");
+    inManualChillMode.set(true);
+  }
+
+  /**
+   * Brings node manager out of manual chill mode.
+   */
+  @Override
+  public void exitChillMode() {
+    LOG.info("Leaving manual chill mode.");
+    inManualChillMode.set(false);
+  }
+
+  /**
+   * Returns true if node manager is out of chill mode, else false.
+   * @return true if out of chill mode, else false
+   */
+  @Override
+  public boolean isOutOfChillMode() {
+    return !(inStartupChillMode.get() || inManualChillMode.get());
+  }
+
+  /**
+   * Returns the Number of Datanodes by State they are in.
+   *
+   * @return int -- count
+   */
+  @Override
+  public int getNodeCount(NodeState nodestate) {
+    switch (nodestate) {
+    case HEALTHY:
+      return healthyNodeCount.get();
+    case STALE:
+      return staleNodeCount.get();
+    case DEAD:
+      return deadNodeCount.get();
+    case INVALID:
+      // This is unknown due to the fact that some nodes can be in
+      // transit between the other states. Returning a count for that is not
+      // possible. The fact that we have such state is to deal with the fact
+      // that this information might not be consistent always.
+      return 0;
+    default:
+      return 0;
+    }
+  }
+
+  /**
+   * Used for testing.
+   *
+   * @return true if the HB check is done.
+   */
+  @VisibleForTesting
+  @Override
+  public boolean waitForHeartbeatProcessed() {
+    return lastHBcheckFinished != 0;
+  }
+
+  /**
+   * Returns the node state of a specific node.
+   *
+   * @param datanodeDetails - Datanode Details
+   * @return Healthy/Stale/Dead/Unknown.
+   */
+  @Override
+  public NodeState getNodeState(DatanodeDetails datanodeDetails) {
+    // There is a subtle race condition here, hence we also support
+    // the NODEState.UNKNOWN. It is possible that just before we check the
+    // healthyNodes, we have removed the node from the healthy list but stil
+    // not added it to Stale Nodes list.
+    // We can fix that by adding the node to stale list before we remove, but
+    // then the node is in 2 states to avoid this race condition. Instead we
+    // just deal with the possibilty of getting a state called unknown.
+
+    UUID id = datanodeDetails.getUuid();
+    if(healthyNodes.containsKey(id)) {
+      return HEALTHY;
+    }
+
+    if(staleNodes.containsKey(id)) {
+      return STALE;
+    }
+
+    if(deadNodes.containsKey(id)) {
+      return DEAD;
+    }
+
+    return INVALID;
+  }
+
+  /**
+   * This is the real worker thread that processes the HB queue. We do the
+   * following things in this thread.
+   * <p>
+   * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node 
to
+   * healthy if we got a heartbeat from them. Move Stales Node to dead node
+   * table if it is needed. Move healthy nodes to stale nodes if it is needed.
+   * <p>
+   * if it is a new node, we call register node and add it to the list of 
nodes.
+   * This will be replaced when we support registration of a node in SCM.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    lastHBcheckStart = monotonicNow();
+    lastHBProcessedCount = 0;
+
+    // Process the whole queue.
+    while (!heartbeatQueue.isEmpty() &&
+        (lastHBProcessedCount < maxHBToProcessPerLoop)) {
+      HeartbeatQueueItem hbItem = heartbeatQueue.poll();
+      synchronized (this) {
+        handleHeartbeat(hbItem);
+      }
+      // we are shutting down or something give up processing the rest of
+      // HBs. This will terminate the HB processing thread.
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Current Thread is isInterrupted, shutting down HB " +
+            "processing thread for Node Manager.");
+        return;
+      }
+    }
+
+    if (lastHBProcessedCount >= maxHBToProcessPerLoop) {
+      LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" 
+
+          " the heartbeat counts. Processed {} heartbeats. Breaking out of" +
+          " loop. Leaving rest to be processed later. ", lastHBProcessedCount);
+    }
+
+    // Iterate over the Stale nodes and decide if we need to move any node to
+    // dead State.
+    long currentTime = monotonicNow();
+    for (Map.Entry<UUID, Long> entry : staleNodes.entrySet()) {
+      if (currentTime - entry.getValue() > deadNodeIntervalMs) {
+        synchronized (this) {
+          moveStaleNodeToDead(entry);
+        }
+      }
+    }
+
+    // Iterate over the healthy nodes and decide if we need to move any node to
+    // Stale State.
+    currentTime = monotonicNow();
+    for (Map.Entry<UUID, Long> entry : healthyNodes.entrySet()) {
+      if (currentTime - entry.getValue() > staleNodeIntervalMs) {
+        synchronized (this) {
+          moveHealthyNodeToStale(entry);
+        }
+      }
+    }
+    lastHBcheckFinished = monotonicNow();
+
+    monitorHBProcessingTime();
+
+    // we purposefully make this non-deterministic. Instead of using a
+    // scheduleAtFixedFrequency  we will just go to sleep
+    // and wake up at the next rendezvous point, which is currentTime +
+    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
+    // heart beating not at a fixed cadence, but clock tick + time taken to
+    // work.
+    //
+    // This time taken to work can skew the heartbeat processor thread.
+    // The reason why we don't care is because of the following reasons.
+    //
+    // 1. checkerInterval is general many magnitudes faster than datanode HB
+    // frequency.
+    //
+    // 2. if we have too much nodes, the SCM would be doing only HB
+    // processing, this could lead to SCM's CPU starvation. With this
+    // approach we always guarantee that  HB thread sleeps for a little while.
+    //
+    // 3. It is possible that we will never finish processing the HB's in the
+    // thread. But that means we have a mis-configured system. We will warn
+    // the users by logging that information.
+    //
+    // 4. And the most important reason, heartbeats are not blocked even if
+    // this thread does not run, they will go into the processing queue.
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
+          .MILLISECONDS);
+    } else {
+      LOG.info("Current Thread is interrupted, shutting down HB processing " +
+          "thread for Node Manager.");
+    }
+  }
+
+  /**
+   * If we have taken too much time for HB processing, log that information.
+   */
+  private void monitorHBProcessingTime() {
+    if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished -
+        lastHBcheckStart) > datanodeHBIntervalSeconds) {
+      LOG.error("Total time spend processing datanode HB's is greater than " +
+              "configured values for datanode heartbeats. Please adjust the" +
+              " heartbeat configs. Time Spend on HB processing: {} seconds " +
+              "Datanode heartbeat Interval: {} seconds , heartbeats " +
+              "processed: {}",
+          TimeUnit.MILLISECONDS
+              .toSeconds(lastHBcheckFinished - lastHBcheckStart),
+          datanodeHBIntervalSeconds, lastHBProcessedCount);
+    }
+  }
+
+  /**
+   * Moves a Healthy node to a Stale node state.
+   *
+   * @param entry - Map Entry
+   */
+  private void moveHealthyNodeToStale(Map.Entry<UUID, Long> entry) {
+    LOG.trace("Moving healthy node to stale: {}", entry.getKey());
+    healthyNodes.remove(entry.getKey());
+    healthyNodeCount.decrementAndGet();
+    staleNodes.put(entry.getKey(), entry.getValue());
+    staleNodeCount.incrementAndGet();
+
+    if (scmManager != null) {
+      // remove stale node's container report
+      scmManager.removeContainerReport(entry.getKey().toString());
+    }
+  }
+
+  /**
+   * Moves a Stale node to a dead node state.
+   *
+   * @param entry - Map Entry
+   */
+  private void moveStaleNodeToDead(Map.Entry<UUID, Long> entry) {
+    LOG.trace("Moving stale node to dead: {}", entry.getKey());
+    staleNodes.remove(entry.getKey());
+    staleNodeCount.decrementAndGet();
+    deadNodes.put(entry.getKey(), entry.getValue());
+    deadNodeCount.incrementAndGet();
+
+    // Update SCM node stats
+    SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey());
+    scmStat.subtract(deadNodeStat);
+    nodeStats.remove(entry.getKey());
+  }
+
+  /**
+   * Handles a single heartbeat from a datanode.
+   *
+   * @param hbItem - heartbeat item from a datanode.
+   */
+  private void handleHeartbeat(HeartbeatQueueItem hbItem) {
+    lastHBProcessedCount++;
+
+    DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails();
+    UUID datanodeUuid = datanodeDetails.getUuid();
+    SCMNodeReport nodeReport = hbItem.getNodeReport();
+    long recvTimestamp = hbItem.getRecvTimestamp();
+    long processTimestamp = Time.monotonicNow();
+    if (LOG.isTraceEnabled()) {
+      //TODO: add average queue time of heartbeat request as metrics
+      LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
+          datanodeUuid, processTimestamp - recvTimestamp);
+    }
+
+    // If this node is already in the list of known and healthy nodes
+    // just set the last timestamp and return.
+    if (healthyNodes.containsKey(datanodeUuid)) {
+      healthyNodes.put(datanodeUuid, processTimestamp);
+      updateNodeStat(datanodeUuid, nodeReport);
+      updateCommandQueue(datanodeUuid,
+          hbItem.getContainerReportState().getState());
+      return;
+    }
+
+    // A stale node has heartbeat us we need to remove the node from stale
+    // list and move to healthy list.
+    if (staleNodes.containsKey(datanodeUuid)) {
+      staleNodes.remove(datanodeUuid);
+      healthyNodes.put(datanodeUuid, processTimestamp);
+      healthyNodeCount.incrementAndGet();
+      staleNodeCount.decrementAndGet();
+      updateNodeStat(datanodeUuid, nodeReport);
+      updateCommandQueue(datanodeUuid,
+          hbItem.getContainerReportState().getState());
+      return;
+    }
+
+    // A dead node has heartbeat us, we need to remove that node from dead
+    // node list and move it to the healthy list.
+    if (deadNodes.containsKey(datanodeUuid)) {
+      deadNodes.remove(datanodeUuid);
+      healthyNodes.put(datanodeUuid, processTimestamp);
+      deadNodeCount.decrementAndGet();
+      healthyNodeCount.incrementAndGet();
+      updateNodeStat(datanodeUuid, nodeReport);
+      updateCommandQueue(datanodeUuid,
+          hbItem.getContainerReportState().getState());
+      return;
+    }
+
+    LOG.warn("SCM receive heartbeat from unregistered datanode {}",
+        datanodeUuid);
+    this.commandQueue.addCommand(datanodeUuid,
+        new ReregisterCommand());
+  }
+
+  private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) {
+    SCMNodeStat stat = nodeStats.get(dnId);
+    if (stat == null) {
+      LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
+          "dead datanode {}", dnId);
+      stat = new SCMNodeStat();
+    }
+
+    if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
+      long totalCapacity = 0;
+      long totalRemaining = 0;
+      long totalScmUsed = 0;
+      List<SCMStorageReport> storageReports = 
nodeReport.getStorageReportList();
+      for (SCMStorageReport report : storageReports) {
+        totalCapacity += report.getCapacity();
+        totalRemaining +=  report.getRemaining();
+        totalScmUsed+= report.getScmUsed();
+      }
+      scmStat.subtract(stat);
+      stat.set(totalCapacity, totalScmUsed, totalRemaining);
+      nodeStats.put(dnId, stat);
+      scmStat.add(stat);
+    }
+  }
+
+  private void updateCommandQueue(UUID dnId,
+                                  ReportState.states containerReportState) {
+    if (containerReportState != null) {
+      switch (containerReportState) {
+      case completeContinerReport:
+        commandQueue.addCommand(dnId,
+            SendContainerCommand.newBuilder().build());
+        return;
+      case deltaContainerReport:
+      case noContainerReports:
+      default:
+        // do nothing
+      }
+    }
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it. 
If
+   * the stream is already closed then invoking this method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    unregisterMXBean();
+    nodePoolManager.close();
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown NodeManager properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @VisibleForTesting
+  long getLastHBProcessedCount() {
+    return lastHBProcessedCount;
+  }
+
+  /**
+   * Gets the version info from SCM.
+   *
+   * @param versionRequest - version Request.
+   * @return - returns SCM version info and other required information needed 
by
+   * datanode.
+   */
+  @Override
+  public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
+    return VersionResponse.newBuilder()
+        .setVersion(this.version.getVersion())
+        .build();
+  }
+
+  /**
+   * Register the node if the node finds that it is not registered with any
+   * SCM.
+   *
+   * @param datanodeDetailsProto - Send datanodeDetails with Node info.
+   *                   This function generates and assigns new datanode ID
+   *                   for the datanode. This allows SCM to be run independent
+   *                   of Namenode if required.
+   *
+   * @return SCMHeartbeatResponseProto
+   */
+  @Override
+  public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) {
+
+    DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
+        datanodeDetailsProto);
+    InetAddress dnAddress = Server.getRemoteIp();
+    if (dnAddress != null) {
+      // Mostly called inside an RPC, update ip and peer hostname
+      String hostname = dnAddress.getHostName();
+      String ip = dnAddress.getHostAddress();
+      datanodeDetails.setHostName(hostname);
+      datanodeDetails.setIpAddress(ip);
+    }
+    SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
+    if (responseCommand != null) {
+      return responseCommand;
+    }
+    UUID dnId = datanodeDetails.getUuid();
+    nodes.put(dnId, datanodeDetails);
+    totalNodes.incrementAndGet();
+    healthyNodes.put(dnId, monotonicNow());
+    healthyNodeCount.incrementAndGet();
+    nodeStats.put(dnId, new SCMNodeStat());
+
+    if(inStartupChillMode.get() &&
+        totalNodes.get() >= getMinimumChillModeNodes()) {
+      inStartupChillMode.getAndSet(false);
+      LOG.info("Leaving startup chill mode.");
+    }
+
+    // TODO: define node pool policy for non-default node pool.
+    // For now, all nodes are added to the "DefaultNodePool" upon registration
+    // if it has not been added to any node pool yet.
+    try {
+      if (nodePoolManager.getNodePool(datanodeDetails) == null) {
+        nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
+            datanodeDetails);
+      }
+    } catch (IOException e) {
+      // TODO: make sure registration failure is handled correctly.
+      return RegisteredCommand.newBuilder()
+          .setErrorCode(ErrorCode.errorNodeNotPermitted)
+          .build();
+    }
+    LOG.info("Data node with ID: {} Registered.",
+        datanodeDetails.getUuid());
+    return RegisteredCommand.newBuilder()
+        .setErrorCode(ErrorCode.success)
+        .setDatanodeUUID(datanodeDetails.getUuidString())
+        .setClusterID(this.clusterID)
+        .build();
+  }
+
+  /**
+   * Verifies the datanode does not have a valid UUID already.
+   *
+   * @param datanodeDetails - Datanode Details.
+   * @return SCMCommand
+   */
+  private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) {
+    if (datanodeDetails.getUuid() != null &&
+        nodes.containsKey(datanodeDetails.getUuid())) {
+      LOG.trace("Datanode is already registered. Datanode: {}",
+          datanodeDetails.toString());
+      return RegisteredCommand.newBuilder()
+          .setErrorCode(ErrorCode.success)
+          .setClusterID(this.clusterID)
+          .setDatanodeUUID(datanodeDetails.getUuidString())
+          .build();
+    }
+    return null;
+  }
+
+  /**
+   * Send heartbeat to indicate the datanode is alive and doing well.
+   *
+   * @param datanodeDetailsProto - DatanodeDetailsProto.
+   * @param nodeReport - node report.
+   * @param containerReportState - container report state.
+   * @return SCMheartbeat response.
+   * @throws IOException
+   */
+  @Override
+  public List<SCMCommand> sendHeartbeat(
+      DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
+      ReportState containerReportState) {
+
+    DatanodeDetails datanodeDetails = DatanodeDetails
+        .getFromProtoBuf(datanodeDetailsProto);
+
+    // Checking for NULL to make sure that we don't get
+    // an exception from ConcurrentList.
+    // This could be a problem in tests, if this function is invoked via
+    // protobuf, transport layer will guarantee that this is not null.
+    if (datanodeDetails != null) {
+      heartbeatQueue.add(
+          new HeartbeatQueueItem.Builder()
+              .setDatanodeDetails(datanodeDetails)
+              .setNodeReport(nodeReport)
+              .setContainerReportState(containerReportState)
+              .build());
+    } else {
+      LOG.error("Datanode ID in heartbeat is null");
+    }
+
+    return commandQueue.getCommand(datanodeDetails.getUuid());
+  }
+
+  /**
+   * Returns the aggregated node stats.
+   * @return the aggregated node stats.
+   */
+  @Override
+  public SCMNodeStat getStats() {
+    return new SCMNodeStat(this.scmStat);
+  }
+
+  /**
+   * Return a map of node stats.
+   * @return a map of individual node stats (live/stale but not dead).
+   */
+  @Override
+  public Map<UUID, SCMNodeStat> getNodeStats() {
+    return Collections.unmodifiableMap(nodeStats);
+  }
+
+  /**
+   * Return the node stat of the specified datanode.
+   * @param datanodeDetails - datanode ID.
+   * @return node stat if it is live/stale, null if it is dead or does't exist.
+   */
+  @Override
+  public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
+    return new SCMNodeMetric(nodeStats.get(datanodeDetails));
+  }
+
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return nodePoolManager;
+  }
+
+  @Override
+  public Map<String, Integer> getNodeCount() {
+    Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
+    for(NodeState state : NodeState.values()) {
+      nodeCountMap.put(state.toString(), getNodeCount(state));
+    }
+    return nodeCountMap;
+  }
+
+  @Override
+  public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+    this.commandQueue.addCommand(dnId, command);
+  }
+
+  @VisibleForTesting
+  public void setStaleNodeIntervalMs(long interval) {
+    this.staleNodeIntervalMs = interval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
new file mode 100644
index 0000000..a4a6c51
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_NODE_IN_POOL;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_LOAD_NODEPOOL;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
+
+/**
+ * SCM node pool manager that manges node pools.
+ */
+public final class SCMNodePoolManager implements NodePoolManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMNodePoolManager.class);
+  private static final List<DatanodeDetails> EMPTY_NODE_LIST =
+      new ArrayList<>();
+  private static final List<String> EMPTY_NODEPOOL_LIST = new ArrayList<>();
+  public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
+
+  // DB that saves the node to node pool mapping.
+  private MetadataStore nodePoolStore;
+
+  // In-memory node pool to nodes mapping
+  private HashMap<String, Set<DatanodeDetails>> nodePools;
+
+  // Read-write lock for nodepool operations
+  private ReadWriteLock lock;
+
+  /**
+   * Construct SCMNodePoolManager class that manages node to node pool mapping.
+   * @param conf - configuration.
+   * @throws IOException
+   */
+  public SCMNodePoolManager(final OzoneConfiguration conf)
+      throws IOException {
+    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    File metaDir = getOzoneMetaDirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
+    File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
+    nodePoolStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(nodePoolDBPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+    nodePools = new HashMap<>();
+    lock = new ReentrantReadWriteLock();
+    init();
+  }
+
+  /**
+   * Initialize the in-memory store based on persist store from level db.
+   * No lock is needed as init() is only invoked by constructor.
+   * @throws SCMException
+   */
+  private void init() throws SCMException {
+    try {
+      nodePoolStore.iterate(null, (key, value) -> {
+        try {
+          DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf(
+              HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key));
+          String poolName = DFSUtil.bytes2String(value);
+
+          Set<DatanodeDetails> nodePool = null;
+          if (nodePools.containsKey(poolName)) {
+            nodePool = nodePools.get(poolName);
+          } else {
+            nodePool = new HashSet<>();
+            nodePools.put(poolName, nodePool);
+          }
+          nodePool.add(nodeId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding node: {} to node pool: {}",
+                nodeId, poolName);
+          }
+        } catch (IOException e) {
+          LOG.warn("Can't add a datanode to node pool, continue next...");
+        }
+        return true;
+      });
+    } catch (IOException e) {
+      LOG.error("Loading node pool error " + e);
+      throw new SCMException("Failed to load node pool",
+          FAILED_TO_LOAD_NODEPOOL);
+    }
+  }
+
+  /**
+   * Add a datanode to a node pool.
+   * @param pool - name of the node pool.
+   * @param node - name of the datanode.
+   */
+  @Override
+  public void addNode(final String pool, final DatanodeDetails node)
+      throws IOException {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    Preconditions.checkNotNull(node, "node is null");
+    lock.writeLock().lock();
+    try {
+      // add to the persistent store
+      nodePoolStore.put(node.getProtoBufMessage().toByteArray(),
+          DFSUtil.string2Bytes(pool));
+
+      // add to the in-memory store
+      Set<DatanodeDetails> nodePool = null;
+      if (nodePools.containsKey(pool)) {
+        nodePool = nodePools.get(pool);
+      } else {
+        nodePool = new HashSet<DatanodeDetails>();
+        nodePools.put(pool, nodePool);
+      }
+      nodePool.add(node);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Remove a datanode from a node pool.
+   * @param pool - name of the node pool.
+   * @param node - datanode id.
+   * @throws SCMException
+   */
+  @Override
+  public void removeNode(final String pool, final DatanodeDetails node)
+      throws SCMException {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    Preconditions.checkNotNull(node, "node is null");
+    lock.writeLock().lock();
+    try {
+      // Remove from the persistent store
+      byte[] kName = node.getProtoBufMessage().toByteArray();
+      byte[] kData = nodePoolStore.get(kName);
+      if (kData == null) {
+        throw new SCMException(String.format("Unable to find node %s from" +
+            " pool %s in DB.", DFSUtil.bytes2String(kName), pool),
+            FAILED_TO_FIND_NODE_IN_POOL);
+      }
+      nodePoolStore.delete(kName);
+
+      // Remove from the in-memory store
+      if (nodePools.containsKey(pool)) {
+        Set<DatanodeDetails> nodePool = nodePools.get(pool);
+        nodePool.remove(node);
+      } else {
+        throw new SCMException(String.format("Unable to find node %s from" +
+            " pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
+            FAILED_TO_FIND_NODE_IN_POOL);
+      }
+    } catch (IOException e) {
+      throw new SCMException("Failed to remove node " + node.toString()
+          + " from node pool " + pool, e,
+          SCMException.ResultCodes.IO_EXCEPTION);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Get all the node pools.
+   * @return all the node pools.
+   */
+  @Override
+  public List<String> getNodePools() {
+    lock.readLock().lock();
+    try {
+      if (!nodePools.isEmpty()) {
+        return nodePools.keySet().stream().collect(Collectors.toList());
+      } else {
+        return EMPTY_NODEPOOL_LIST;
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get all datanodes of a specific node pool.
+   * @param pool - name of the node pool.
+   * @return all datanodes of the specified node pool.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(final String pool) {
+    Preconditions.checkNotNull(pool, "pool name is null");
+    if (nodePools.containsKey(pool)) {
+      return nodePools.get(pool).stream().collect(Collectors.toList());
+    } else {
+      return EMPTY_NODE_LIST;
+    }
+  }
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   * @param datanodeDetails - datanode ID.
+   * @return node pool name if it has been assigned.
+   * null if the node has not been assigned to any node pool yet.
+   * TODO: Put this in a in-memory map if performance is an issue.
+   */
+  @Override
+  public String getNodePool(final DatanodeDetails datanodeDetails)
+      throws SCMException {
+    Preconditions.checkNotNull(datanodeDetails, "node is null");
+    try {
+      byte[]  result = nodePoolStore.get(
+          datanodeDetails.getProtoBufMessage().toByteArray());
+      return result == null ? null : DFSUtil.bytes2String(result);
+    } catch (IOException e) {
+      throw new SCMException("Failed to get node pool for node "
+          + datanodeDetails.toString(), e,
+          SCMException.ResultCodes.IO_EXCEPTION);
+    }
+  }
+
+  /**
+   * Close node pool level db store.
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    nodePoolStore.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
new file mode 100644
index 0000000..d6a8ad0
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+/**
+ * The node package deals with node management.
+ * <p/>
+ * The node manager takes care of node registrations, removal of node and
+ * handling of heartbeats.
+ * <p/>
+ * The node manager maintains statistics that gets send as part of
+ * heartbeats.
+ * <p/>
+ * The container manager polls the node manager to learn the state of
+ * datanodes  that it is interested in.
+ * <p/>
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
new file mode 100644
index 0000000..4669e74
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+/*
+ * This package contains StorageContainerManager classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
new file mode 100644
index 0000000..8e43528
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manage Ozone pipelines.
+ */
+public abstract class PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineManager.class);
+  private final List<PipelineChannel> activePipelineChannels;
+  private final AtomicInteger conduitsIndex;
+
+  public PipelineManager() {
+    activePipelineChannels = new LinkedList<>();
+    conduitsIndex = new AtomicInteger(0);
+  }
+
+  /**
+   * This function is called by the Container Manager while allocating a new
+   * container. The client specifies what kind of replication pipeline is
+   * needed and based on the replication type in the request appropriate
+   * Interface is invoked.
+   *
+   * @param containerName Name of the container
+   * @param replicationFactor - Replication Factor
+   * @return a Pipeline.
+   */
+  public synchronized final Pipeline getPipeline(String containerName,
+      ReplicationFactor replicationFactor, ReplicationType replicationType)
+      throws IOException {
+    /**
+     * In the Ozone world, we have a very simple policy.
+     *
+     * 1. Try to create a pipelineChannel if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipelineChannel quickly.
+     *
+     * 3. if there are not enough free nodes, return conduits in a
+     * round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns conduits in round robin
+     * fashion.
+     */
+    PipelineChannel pipelineChannel =
+        allocatePipelineChannel(replicationFactor);
+    if (pipelineChannel != null) {
+      LOG.debug("created new pipelineChannel:{} for container:{}",
+          pipelineChannel.getName(), containerName);
+      activePipelineChannels.add(pipelineChannel);
+    } else {
+      pipelineChannel =
+          findOpenPipelineChannel(replicationType, replicationFactor);
+      if (pipelineChannel != null) {
+        LOG.debug("re-used pipelineChannel:{} for container:{}",
+            pipelineChannel.getName(), containerName);
+      }
+    }
+    if (pipelineChannel == null) {
+      LOG.error("Get pipelineChannel call failed. We are not able to find" +
+              "free nodes or operational pipelineChannel.");
+      return null;
+    } else {
+      return new Pipeline(containerName, pipelineChannel);
+    }
+  }
+
+  protected int getReplicationCount(ReplicationFactor factor) {
+    switch (factor) {
+    case ONE:
+      return 1;
+    case THREE:
+      return 3;
+    default:
+      throw new IllegalArgumentException("Unexpected replication count");
+    }
+  }
+
+  public abstract PipelineChannel allocatePipelineChannel(
+      ReplicationFactor replicationFactor) throws IOException;
+
+  /**
+   * Find a PipelineChannel that is operational.
+   *
+   * @return - Pipeline or null
+   */
+  private PipelineChannel findOpenPipelineChannel(
+      ReplicationType type, ReplicationFactor factor) {
+    PipelineChannel pipelineChannel = null;
+    final int sentinal = -1;
+    if (activePipelineChannels.size() == 0) {
+      LOG.error("No Operational conduits found. Returning null.");
+      return null;
+    }
+    int startIndex = getNextIndex();
+    int nextIndex = sentinal;
+    for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
+      // Just walk the list in a circular way.
+      PipelineChannel temp =
+          activePipelineChannels
+              .get(nextIndex != sentinal ? nextIndex : startIndex);
+      // if we find an operational pipelineChannel just return that.
+      if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
+          (temp.getFactor() == factor) && (temp.getType() == type)) {
+        pipelineChannel = temp;
+        break;
+      }
+    }
+    return pipelineChannel;
+  }
+
+  /**
+   * gets the next index of the PipelineChannel to get.
+   *
+   * @return index in the link list to get.
+   */
+  private int getNextIndex() {
+    return conduitsIndex.incrementAndGet() % activePipelineChannels.size();
+  }
+
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  public abstract void createPipeline(String pipelineID,
+      List<DatanodeDetails> datanodes) throws IOException;
+
+  /**
+   * Close the  pipeline with the given clusterId.
+   */
+  public abstract void closePipeline(String pipelineID) throws IOException;
+
+  /**
+   * list members in the pipeline .
+   * @return the datanode
+   */
+  public abstract List<DatanodeDetails> getMembers(String pipelineID)
+      throws IOException;
+
+  /**
+   * Update the datanode list of the pipeline.
+   */
+  public abstract void updatePipeline(String pipelineID,
+      List<DatanodeDetails> newDatanodes) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
new file mode 100644
index 0000000..f0c9eea
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
+import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sends the request to the right pipeline manager.
+ */
+public class PipelineSelector {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineSelector.class);
+  private final ContainerPlacementPolicy placementPolicy;
+  private final NodeManager nodeManager;
+  private final Configuration conf;
+  private final RatisManagerImpl ratisManager;
+  private final StandaloneManagerImpl standaloneManager;
+  private final long containerSize;
+
+  /**
+   * Constructs a pipeline Selector.
+   *
+   * @param nodeManager - node manager
+   * @param conf - Ozone Config
+   */
+  public PipelineSelector(NodeManager nodeManager, Configuration conf) {
+    this.nodeManager = nodeManager;
+    this.conf = conf;
+    this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
+    this.containerSize = OzoneConsts.GB * this.conf.getInt(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+    this.standaloneManager =
+        new StandaloneManagerImpl(this.nodeManager, placementPolicy,
+            containerSize);
+    this.ratisManager =
+        new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
+            conf);
+  }
+
+  /**
+   * Translates a list of nodes, ordered such that the first is the leader, 
into
+   * a corresponding {@link Pipeline} object.
+   *
+   * @param nodes - list of datanodes on which we will allocate the container.
+   * The first of the list will be the leader node.
+   * @return pipeline corresponding to nodes
+   */
+  public static PipelineChannel newPipelineFromNodes(
+      List<DatanodeDetails> nodes, LifeCycleState state,
+      ReplicationType replicationType, ReplicationFactor replicationFactor,
+      String name) {
+    Preconditions.checkNotNull(nodes);
+    Preconditions.checkArgument(nodes.size() > 0);
+    String leaderId = nodes.get(0).getUuidString();
+    PipelineChannel
+        pipelineChannel = new PipelineChannel(leaderId, state, replicationType,
+        replicationFactor, name);
+    for (DatanodeDetails node : nodes) {
+      pipelineChannel.addMember(node);
+    }
+    return pipelineChannel;
+  }
+
+  /**
+   * Create pluggable container placement policy implementation instance.
+   *
+   * @param nodeManager - SCM node manager.
+   * @param conf - configuration.
+   * @return SCM container placement policy implementation instance.
+   */
+  @SuppressWarnings("unchecked")
+  private static ContainerPlacementPolicy createContainerPlacementPolicy(
+      final NodeManager nodeManager, final Configuration conf) {
+    Class<? extends ContainerPlacementPolicy> implClass =
+        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+            SCMContainerPlacementRandom.class);
+
+    try {
+      Constructor<? extends ContainerPlacementPolicy> ctor =
+          implClass.getDeclaredConstructor(NodeManager.class,
+              Configuration.class);
+      return ctor.newInstance(nodeManager, conf);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(implClass.getName()
+          + " could not be constructed.", e.getCause());
+    } catch (Exception e) {
+      LOG.error("Unhandled exception occurred, Placement policy will not be " +
+          "functional.");
+      throw new IllegalArgumentException("Unable to load " +
+          "ContainerPlacementPolicy", e);
+    }
+  }
+
+  /**
+   * Return the pipeline manager from the replication type.
+   *
+   * @param replicationType - Replication Type Enum.
+   * @return pipeline Manager.
+   * @throws IllegalArgumentException If an pipeline type gets added
+   * and this function is not modified we will throw.
+   */
+  private PipelineManager getPipelineManager(ReplicationType replicationType)
+      throws IllegalArgumentException {
+    switch (replicationType) {
+    case RATIS:
+      return this.ratisManager;
+    case STAND_ALONE:
+      return this.standaloneManager;
+    case CHAINED:
+      throw new IllegalArgumentException("Not implemented yet");
+    default:
+      throw new IllegalArgumentException("Unexpected enum found. Does not" +
+          " know how to handle " + replicationType.toString());
+    }
+
+  }
+
+  /**
+   * This function is called by the Container Manager while allocating a new
+   * container. The client specifies what kind of replication pipeline is 
needed
+   * and based on the replication type in the request appropriate Interface is
+   * invoked.
+   */
+
+  public Pipeline getReplicationPipeline(ReplicationType replicationType,
+      HddsProtos.ReplicationFactor replicationFactor, String containerName)
+      throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Getting replication pipeline for {} : Replication {}",
+        containerName, replicationFactor.toString());
+    return manager.
+        getPipeline(containerName, replicationFactor, replicationType);
+  }
+
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   */
+
+  public void createPipeline(ReplicationType replicationType, String
+      pipelineID, List<DatanodeDetails> datanodes) throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
+        datanodes.stream().map(DatanodeDetails::toString)
+            .collect(Collectors.joining(",")));
+    manager.createPipeline(pipelineID, datanodes);
+  }
+
+  /**
+   * Close the  pipeline with the given clusterId.
+   */
+
+  public void closePipeline(ReplicationType replicationType, String
+      pipelineID) throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Closing pipeline. pipelineID: {}", pipelineID);
+    manager.closePipeline(pipelineID);
+  }
+
+  /**
+   * list members in the pipeline .
+   */
+
+  public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
+      String pipelineID) throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
+    return manager.getMembers(pipelineID);
+  }
+
+  /**
+   * Update the datanodes in the list of the pipeline.
+   */
+
+  public void updateDatanodes(ReplicationType replicationType, String
+      pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID,
+        newDatanodes.stream().map(DatanodeDetails::toString)
+            .collect(Collectors.joining(",")));
+    manager.updatePipeline(pipelineID, newDatanodes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
new file mode 100644
index 0000000..ea24c58
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines;
+/**
+ Ozone supports the notion of different kind of pipelines.
+ That means that we can have a replication pipeline build on
+ Ratis, Standalone or some other protocol. All Pipeline managers
+ the entities in charge of pipelines reside in the package.
+
+ Here is the high level Arch.
+
+ 1. A pipeline selector class is instantiated in the Container manager class.
+
+ 2. A client when creating a container -- will specify what kind of
+ replication type it wants to use. We support 2 types now, Ratis and 
StandAlone.
+
+ 3. Based on the replication type, the pipeline selector class asks the
+ corresponding pipeline manager for a pipeline.
+
+ 4. We have supported the ability for clients to specify a set of nodes in
+ the pipeline or rely in the pipeline manager to select the datanodes if they
+ are not specified.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
new file mode 100644
index 0000000..089a137
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines.ratis;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Implementation of {@link PipelineManager}.
+ *
+ * TODO : Introduce a state machine.
+ */
+public class RatisManagerImpl extends PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RatisManagerImpl.class);
+  private static final String PREFIX = "Ratis-";
+  private final Configuration conf;
+  private final NodeManager nodeManager;
+  private final Set<DatanodeDetails> ratisMembers;
+
+  /**
+   * Constructs a Ratis Pipeline Manager.
+   *
+   * @param nodeManager
+   */
+  public RatisManagerImpl(NodeManager nodeManager,
+      ContainerPlacementPolicy placementPolicy, long size, Configuration conf) 
{
+    super();
+    this.conf = conf;
+    this.nodeManager = nodeManager;
+    ratisMembers = new HashSet<>();
+  }
+
+  /**
+   * Allocates a new ratis PipelineChannel from the free nodes.
+   *
+   * @param factor - One or Three
+   * @return PipelineChannel.
+   */
+  public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
+    List<DatanodeDetails> newNodesList = new LinkedList<>();
+    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+    int count = getReplicationCount(factor);
+    //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
+    // data from datanode instead of maintaining a set.
+    for (DatanodeDetails datanode : datanodes) {
+      Preconditions.checkNotNull(datanode);
+      if (!ratisMembers.contains(datanode)) {
+        newNodesList.add(datanode);
+        if (newNodesList.size() == count) {
+          // once a datanode has been added to a pipeline, exclude it from
+          // further allocations
+          ratisMembers.addAll(newNodesList);
+          LOG.info("Allocating a new ratis pipelineChannel of size: {}", 
count);
+          // Start all channel names with "Ratis", easy to grep the logs.
+          String conduitName = PREFIX +
+              UUID.randomUUID().toString().substring(PREFIX.length());
+          PipelineChannel pipelineChannel =
+              PipelineSelector.newPipelineFromNodes(newNodesList,
+              LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
+          Pipeline pipeline =
+              new Pipeline("setup", pipelineChannel);
+          try (XceiverClientRatis client =
+              XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+            client.createPipeline(pipeline.getPipelineName(), newNodesList);
+          } catch (IOException e) {
+            return null;
+          }
+          return pipelineChannel;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   *
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  @Override
+  public void createPipeline(String pipelineID,
+                             List<DatanodeDetails> datanodes) {
+
+  }
+
+  /**
+   * Close the  pipeline with the given clusterId.
+   *
+   * @param pipelineID
+   */
+  @Override
+  public void closePipeline(String pipelineID) throws IOException {
+
+  }
+
+  /**
+   * list members in the pipeline .
+   *
+   * @param pipelineID
+   * @return the datanode
+   */
+  @Override
+  public List<DatanodeDetails> getMembers(String pipelineID)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Update the datanode list of the pipeline.
+   *
+   * @param pipelineID
+   * @param newDatanodes
+   */
+  @Override
+  public void updatePipeline(String pipelineID,
+                             List<DatanodeDetails> newDatanodes)
+      throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
new file mode 100644
index 0000000..2970fb3
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines.ratis;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
new file mode 100644
index 0000000..8268329
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines.standalone;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Standalone Manager Impl to prove that pluggable interface
+ * works with current tests.
+ */
+public class StandaloneManagerImpl extends PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StandaloneManagerImpl.class);
+  private final NodeManager nodeManager;
+  private final ContainerPlacementPolicy placementPolicy;
+  private final long containerSize;
+  private final Set<DatanodeDetails> standAloneMembers;
+
+  /**
+   * Constructor for Standalone Node Manager Impl.
+   * @param nodeManager - Node Manager.
+   * @param placementPolicy - Placement Policy
+   * @param containerSize - Container Size.
+   */
+  public StandaloneManagerImpl(NodeManager nodeManager,
+      ContainerPlacementPolicy placementPolicy, long containerSize) {
+    super();
+    this.nodeManager = nodeManager;
+    this.placementPolicy = placementPolicy;
+    this.containerSize =  containerSize;
+    this.standAloneMembers = new HashSet<>();
+  }
+
+
+  /**
+   * Allocates a new standalone PipelineChannel from the free nodes.
+   *
+   * @param factor - One
+   * @return PipelineChannel.
+   */
+  public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
+    List<DatanodeDetails> newNodesList = new LinkedList<>();
+    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+    int count = getReplicationCount(factor);
+    for (DatanodeDetails datanode : datanodes) {
+      Preconditions.checkNotNull(datanode);
+      if (!standAloneMembers.contains(datanode)) {
+        newNodesList.add(datanode);
+        if (newNodesList.size() == count) {
+          // once a datanode has been added to a pipeline, exclude it from
+          // further allocations
+          standAloneMembers.addAll(newNodesList);
+          LOG.info("Allocating a new standalone pipeline channel of size: {}",
+              count);
+          String channelName =
+              "SA-" + UUID.randomUUID().toString().substring(3);
+          return PipelineSelector.newPipelineFromNodes(newNodesList,
+              LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
+              ReplicationFactor.ONE, channelName);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   *
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  @Override
+  public void createPipeline(String pipelineID,
+                             List<DatanodeDetails> datanodes) {
+    //return newPipelineFromNodes(datanodes, pipelineID);
+  }
+
+  /**
+   * Close the  pipeline with the given clusterId.
+   *
+   * @param pipelineID
+   */
+  @Override
+  public void closePipeline(String pipelineID) throws IOException {
+
+  }
+
+  /**
+   * list members in the pipeline .
+   *
+   * @param pipelineID
+   * @return the datanode
+   */
+  @Override
+  public List<DatanodeDetails> getMembers(String pipelineID)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Update the datanode list of the pipeline.
+   *
+   * @param pipelineID
+   * @param newDatanodes
+   */
+  @Override
+  public void updatePipeline(String pipelineID, List<DatanodeDetails>
+      newDatanodes) throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
new file mode 100644
index 0000000..b2c3ca40
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines.standalone;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
new file mode 100644
index 0000000..4944017
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ratis;
+
+/**
+ * This package contains classes related to Apache Ratis for SCM.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/index.html
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/index.html 
b/hadoop-hdds/server-scm/src/main/webapps/scm/index.html
new file mode 100644
index 0000000..3407f51
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/webapps/scm/index.html
@@ -0,0 +1,76 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd";>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<html lang="en">
+<head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head 
content must come *after* these tags -->
+    <meta name="description" content="HDFS Storage Container Manager">
+
+    <title>HDFS Storage Container Manager</title>
+
+    <link href="static/bootstrap-3.0.2/css/bootstrap.min.css" rel="stylesheet">
+    <link href="static/hadoop.css" rel="stylesheet">
+    <link href="static/nvd3-1.8.5.min.css" rel="stylesheet">
+
+    <link href="static/ozone.css" rel="stylesheet">
+
+</head>
+
+<body ng-app="scm">
+
+<header class="navbar navbar-inverse navbar-fixed-top bs-docs-nav">
+    <div class="container-fluid">
+        <div class="navbar-header">
+            <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#navbar"
+                    aria-expanded="false" aria-controls="navbar">
+                <span class="sr-only">Toggle navigation</span>
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+            </button>
+            <a class="navbar-brand" href="#">HDFS SCM</a>
+        </div>
+
+
+        <navmenu
+                metrics="{ 'Rpc metrics' : '#!/metrics/rpc'}"></navmenu>
+
+
+    </div>
+</header>
+
+<div class="container-fluid" style="margin: 12pt">
+
+    <ng-view></ng-view>
+
+</div><!-- /.container -->
+
+<script src="static/jquery-1.10.2.min.js"></script>
+<script src="static/angular-1.6.4.min.js"></script>
+<script src="static/angular-route-1.6.4.min.js"></script>
+<script src="static/d3-3.5.17.min.js"></script>
+<script src="static/nvd3-1.8.5.min.js"></script>
+<script src="static/angular-nvd3-1.0.9.min.js"></script>
+<script src="static/ozone.js"></script>
+<script src="scm.js"></script>
+<script src="static/bootstrap-3.0.2/js/bootstrap.min.js"></script>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/main.html
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/main.html 
b/hadoop-hdds/server-scm/src/main/webapps/scm/main.html
new file mode 100644
index 0000000..2666f81
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/webapps/scm/main.html
@@ -0,0 +1,20 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+<overview>
+    <scm-overview>
+    </scm-overview>
+</overview>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to