http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..7bbe2ef
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+/*
+ This package contains routines that manage replication of a container. This
+ relies on container reports to understand the replication level of a
+ container - UnderReplicated, Replicated, OverReplicated -- and manages the
+ replication level based on that.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
new file mode 100644
index 0000000..288fa2d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.container.states;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+
+/**
+ * Each Attribute that we manage for a container is maintained as a map.
+ * <p>
+ * Currently we manage the following attributes for a container.
+ * <p>
+ * 1. StateMap - LifeCycleState -> Set of ContainerIDs
+ * 2. TypeMap  - ReplicationType -> Set of ContainerIDs
+ * 3. OwnerMap - OwnerNames -> Set of ContainerIDs
+ * 4. FactorMap - ReplicationFactor -> Set of ContainerIDs
+ * <p>
+ * This means that for a cluster size of 750 PB -- we will have around 150
+ * Million containers, if we assume 5GB average container size.
+ * <p>
+ * That implies that these maps will take around 2/3 GB of RAM which will be
+ * pinned down in the SCM. This is deemed acceptable since we can tune the
+ * container size --say we make it 10GB average size, then we can deal with a
+ * cluster size of 1.5 exa bytes with the same metadata in SCMs memory.
+ * <p>
+ * Please note: **This class is not thread safe**. This used to be thread safe,
+ * while bench marking we found that ContainerStateMap would be taking 5
+ * locks for a single container insert. If we remove locks in this class,
+ * then we are able to perform about 540K operations per second, with the
+ * locks in this class it goes down to 246K operations per second. Hence we
+ * are going to rely on ContainerStateMap locks to maintain consistency of
+ * data in these classes too, since ContainerAttribute is only used by
+ * ContainerStateMap class.
+ */
+public class ContainerAttribute<T> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAttribute.class);
+
+  private final Map<T, NavigableSet<ContainerID>> attributeMap;
+  private static final NavigableSet<ContainerID> EMPTY_SET =  Collections
+      .unmodifiableNavigableSet(new TreeSet<>());
+
+  /**
+   * Creates a Container Attribute map from an existing Map.
+   *
+   * @param attributeMap - AttributeMap
+   */
+  public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
+    this.attributeMap = attributeMap;
+  }
+
+  /**
+   * Create an empty Container Attribute map.
+   */
+  public ContainerAttribute() {
+    this.attributeMap = new HashMap<>();
+  }
+
+  /**
+   * Insert or update the value in the Attribute map.
+   *
+   * @param key - The key to the set where the ContainerID should exist.
+   * @param value - Actual Container ID.
+   * @throws SCMException - on Error
+   */
+  public boolean insert(T key, ContainerID value) throws SCMException {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+
+    if (attributeMap.containsKey(key)) {
+      if (attributeMap.get(key).add(value)) {
+        return true; //we inserted the value as it doesn’t exist in the set.
+      } else { // Failure indicates that this ContainerID exists in the Set
+        if (!attributeMap.get(key).remove(value)) {
+          LOG.error("Failure to remove the object from the Map.Key:{}, " +
+              "ContainerID: {}", key, value);
+          throw new SCMException("Failure to remove the object from the Map",
+              FAILED_TO_CHANGE_CONTAINER_STATE);
+        }
+        attributeMap.get(key).add(value);
+        return true;
+      }
+    } else {
+      // This key does not exist, we need to allocate this key in the map.
+      // TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils.
+      // Skipping for now, since FoldedTreeSet does not have implementations
+      // for headSet and TailSet. We need those calls.
+      this.attributeMap.put(key, new TreeSet<>());
+      // This should not fail, we just allocated this object.
+      attributeMap.get(key).add(value);
+      return true;
+    }
+  }
+
+  /**
+   * Returns true if have this bucket in the attribute map.
+   *
+   * @param key - Key to lookup
+   * @return true if we have the key
+   */
+  public boolean hasKey(T key) {
+    Preconditions.checkNotNull(key);
+    return this.attributeMap.containsKey(key);
+  }
+
+  /**
+   * Returns true if we have the key and the containerID in the bucket.
+   *
+   * @param key - Key to the bucket
+   * @param id - container ID that we want to lookup
+   * @return true or false
+   */
+  public boolean hasContainerID(T key, ContainerID id) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(id);
+
+    return this.attributeMap.containsKey(key) &&
+        this.attributeMap.get(key).contains(id);
+  }
+
+  /**
+   * Returns true if we have the key and the containerID in the bucket.
+   *
+   * @param key - Key to the bucket
+   * @param id - container ID that we want to lookup
+   * @return true or false
+   */
+  public boolean hasContainerID(T key, int id) {
+    return hasContainerID(key, ContainerID.valueof(id));
+  }
+
+  /**
+   * Clears all entries for this key type.
+   *
+   * @param key - Key that identifies the Set.
+   */
+  public void clearSet(T key) {
+    Preconditions.checkNotNull(key);
+
+    if (attributeMap.containsKey(key)) {
+      attributeMap.get(key).clear();
+    } else {
+      LOG.debug("key: {} does not exist in the attributeMap", key);
+    }
+  }
+
+  /**
+   * Removes a container ID from the set pointed by the key.
+   *
+   * @param key - key to identify the set.
+   * @param value - Container ID
+   */
+  public boolean remove(T key, ContainerID value) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+
+    if (attributeMap.containsKey(key)) {
+      if (!attributeMap.get(key).remove(value)) {
+        LOG.debug("ContainerID: {} does not exist in the set pointed by " +
+            "key:{}", value, key);
+        return false;
+      }
+      return true;
+    } else {
+      LOG.debug("key: {} does not exist in the attributeMap", key);
+      return false;
+    }
+  }
+
+  /**
+   * Returns the collection that maps to the given key.
+   *
+   * @param key - Key to the bucket.
+   * @return Underlying Set in immutable form.
+   */
+  public NavigableSet<ContainerID> getCollection(T key) {
+    Preconditions.checkNotNull(key);
+
+    if (this.attributeMap.containsKey(key)) {
+      return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
+    }
+    LOG.debug("No such Key. Key {}", key);
+    return EMPTY_SET;
+  }
+
+  /**
+   * Moves a ContainerID from one bucket to another.
+   *
+   * @param currentKey - Current Key
+   * @param newKey - newKey
+   * @param value - ContainerID
+   * @throws SCMException on Error
+   */
+  public void update(T currentKey, T newKey, ContainerID value)
+      throws SCMException {
+    Preconditions.checkNotNull(currentKey);
+    Preconditions.checkNotNull(newKey);
+
+    boolean removed = false;
+    try {
+      removed = remove(currentKey, value);
+      if (!removed) {
+        throw new SCMException("Unable to find key in the current key bucket",
+            FAILED_TO_CHANGE_CONTAINER_STATE);
+      }
+      insert(newKey, value);
+    } catch (SCMException ex) {
+      // if we removed the key, insert it back to original bucket, since the
+      // next insert failed.
+      LOG.error("error in update.", ex);
+      if (removed) {
+        insert(currentKey, value);
+        LOG.trace("reinserted the removed key. {}", currentKey);
+      }
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerState.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerState.java
new file mode 100644
index 0000000..1dac36e
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.states;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class that acts as the container state.
+ */
+public class ContainerState {
+  private final HddsProtos.ReplicationType type;
+  private final String owner;
+  private final HddsProtos.ReplicationFactor replicationFactor;
+
+  /**
+   * Constructs a Container Key.
+   *
+   * @param owner - Container Owners
+   * @param type - Replication Type.
+   * @param factor - Replication Factors
+   */
+  public ContainerState(String owner, HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor) {
+    this.type = type;
+    this.owner = owner;
+    this.replicationFactor = factor;
+  }
+
+
+  public HddsProtos.ReplicationType getType() {
+    return type;
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public HddsProtos.ReplicationFactor getFactor() {
+    return replicationFactor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ContainerState that = (ContainerState) o;
+
+    return new EqualsBuilder()
+        .append(type, that.type)
+        .append(owner, that.owner)
+        .append(replicationFactor, that.replicationFactor)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(137, 757)
+        .append(type)
+        .append(owner)
+        .append(replicationFactor)
+        .toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerKey{" +
+        ", type=" + type +
+        ", owner=" + owner +
+        ", replicationFactor=" + replicationFactor +
+        '}';
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
new file mode 100644
index 0000000..48c6423
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.states;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .CONTAINER_EXISTS;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_CONTAINER;
+
+/**
+ * Container State Map acts like a unified map for various attributes that are
+ * used to select containers when we need allocated blocks.
+ * <p>
+ * This class provides the ability to query 4 classes of attributes. They are
+ * <p>
+ * 1. LifeCycleStates - LifeCycle States of container describe in which state
+ * a container is. For example, a container needs to be in Open State for a
+ * client to able to write to it.
+ * <p>
+ * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or
+ * Key Space Manager (KSM) of Ozone or CBlockServer --  is an owner. It is
+ * possible to have many KSMs for a Ozone cluster and only one SCM. But SCM
+ * keeps the data from each KSM in separate bucket, never mixing them. To
+ * write data, often we have to find all open containers for a specific owner.
+ * <p>
+ * 3. ReplicationType - The clients are allowed to specify what kind of
+ * replication pipeline they want to use. Each Container exists on top of a
+ * pipeline, so we need to get ReplicationType that is specified by the user.
+ * <p>
+ * 4. ReplicationFactor - The replication factor represents how many copies
+ * of data should be made, right now we support 2 different types, ONE
+ * Replica and THREE Replica. User can specify how many copies should be made
+ * for a ozone key.
+ * <p>
+ * The most common access pattern of this class is to select a container based
+ * on all these parameters, for example, when allocating a block we will
+ * select a container that belongs to user1, with Ratis replication which can
+ * make 3 copies of data. The fact that we will look for open containers by
+ * default and if we cannot find them we will add new containers.
+ */
+public class ContainerStateMap {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateMap.class);
+
+  private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
+  private final ContainerAttribute<String> ownerMap;
+  private final ContainerAttribute<ReplicationFactor> factorMap;
+  private final ContainerAttribute<ReplicationType> typeMap;
+
+  private final Map<ContainerID, ContainerInfo> containerMap;
+  private final static NavigableSet<ContainerID> EMPTY_SET  =
+      Collections.unmodifiableNavigableSet(new TreeSet<>());
+
+  // Container State Map lock should be held before calling into
+  // Update ContainerAttributes. The consistency of ContainerAttributes is
+  // protected by this lock.
+  private final AutoCloseableLock autoLock;
+
+  /**
+   * Create a ContainerStateMap.
+   */
+  public ContainerStateMap() {
+    lifeCycleStateMap = new ContainerAttribute<>();
+    ownerMap = new ContainerAttribute<>();
+    factorMap = new ContainerAttribute<>();
+    typeMap = new ContainerAttribute<>();
+    containerMap = new HashMap<>();
+    autoLock = new AutoCloseableLock();
+//        new InstrumentedLock(getClass().getName(), LOG,
+//            new ReentrantLock(),
+//            1000,
+//            300));
+  }
+
+  /**
+   * Adds a ContainerInfo Entry in the ContainerStateMap.
+   *
+   * @param info - container info
+   * @throws SCMException - throws if create failed.
+   */
+  public void addContainer(ContainerInfo info)
+      throws SCMException {
+    Preconditions.checkNotNull(info, "Container Info cannot be null");
+    Preconditions.checkNotNull(info.getPipeline(), "Pipeline cannot be null");
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      ContainerID id = ContainerID.valueof(info.getContainerID());
+      if (containerMap.putIfAbsent(id, info) != null) {
+        LOG.debug("Duplicate container ID detected. {}", id);
+        throw new
+            SCMException("Duplicate container ID detected.",
+            CONTAINER_EXISTS);
+      }
+
+      lifeCycleStateMap.insert(info.getState(), id);
+      ownerMap.insert(info.getOwner(), id);
+      factorMap.insert(info.getPipeline().getFactor(), id);
+      typeMap.insert(info.getPipeline().getType(), id);
+      LOG.trace("Created container with {} successfully.", id);
+    }
+  }
+
+  /**
+   * Returns the latest state of Container from SCM's Container State Map.
+   *
+   * @param info - ContainerInfo
+   * @return ContainerInfo
+   */
+  public ContainerInfo getContainerInfo(ContainerInfo info) {
+    return getContainerInfo(info.getContainerID());
+  }
+
+  /**
+   * Returns the latest state of Container from SCM's Container State Map.
+   *
+   * @param containerID - int
+   * @return container info, if found.
+   */
+  public ContainerInfo getContainerInfo(long containerID) {
+    ContainerID id = new ContainerID(containerID);
+    return containerMap.get(id);
+  }
+
+  /**
+   * Returns the full container Map.
+   *
+   * @return - Map
+   */
+  public Map<ContainerID, ContainerInfo> getContainerMap() {
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return Collections.unmodifiableMap(containerMap);
+    }
+  }
+
+  /**
+   * Just update the container State.
+   * @param info ContainerInfo.
+   */
+  public void updateContainerInfo(ContainerInfo info) throws SCMException {
+    Preconditions.checkNotNull(info);
+    ContainerInfo currentInfo = null;
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      currentInfo = containerMap.get(
+          ContainerID.valueof(info.getContainerID()));
+
+      if (currentInfo == null) {
+        throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+      }
+      containerMap.put(info.containerID(), info);
+    }
+  }
+
+  /**
+   * Update the State of a container.
+   *
+   * @param info - ContainerInfo
+   * @param currentState - CurrentState
+   * @param newState - NewState.
+   * @throws SCMException - in case of failure.
+   */
+  public void updateState(ContainerInfo info, LifeCycleState currentState,
+      LifeCycleState newState) throws SCMException {
+    Preconditions.checkNotNull(currentState);
+    Preconditions.checkNotNull(newState);
+
+    ContainerID id = new ContainerID(info.getContainerID());
+    ContainerInfo currentInfo = null;
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      currentInfo = containerMap.get(id);
+
+      if (currentInfo == null) {
+        throw new
+            SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+      }
+      // We are updating two places before this update is done, these can
+      // fail independently, since the code needs to handle it.
+
+      // We update the attribute map, if that fails it will throw an exception,
+      // so no issues, if we are successful, we keep track of the fact that we
+      // have updated the lifecycle state in the map, and update the container
+      // state. If this second update fails, we will attempt to roll back the
+      // earlier change we did. If the rollback fails, we can be in an
+      // inconsistent state,
+
+      info.setState(newState);
+      containerMap.put(id, info);
+      lifeCycleStateMap.update(currentState, newState, id);
+      LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+          "{}", id, currentState, newState);
+    } catch (SCMException ex) {
+      LOG.error("Unable to update the container state. {}", ex);
+      // we need to revert the change in this attribute since we are not
+      // able to update the hash table.
+      LOG.info("Reverting the update to lifecycle state. Moving back to " +
+              "old state. Old = {}, Attempted state = {}", currentState,
+          newState);
+
+      containerMap.put(id, currentInfo);
+
+      // if this line throws, the state map can be in an inconsistent
+      // state, since we will have modified the attribute by the
+      // container state will not in sync since we were not able to put
+      // that into the hash table.
+      lifeCycleStateMap.update(newState, currentState, id);
+
+      throw new SCMException("Updating the container map failed.", ex,
+          FAILED_TO_CHANGE_CONTAINER_STATE);
+    }
+  }
+
+  /**
+   * Returns A list of containers owned by a name service.
+   *
+   * @param ownerName - Name of the NameService.
+   * @return - NavigableSet of ContainerIDs.
+   */
+  NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
+    Preconditions.checkNotNull(ownerName);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return ownerMap.getCollection(ownerName);
+    }
+  }
+
+  /**
+   * Returns Containers in the System by the Type.
+   *
+   * @param type - Replication type -- StandAlone, Ratis etc.
+   * @return NavigableSet
+   */
+  NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
+    Preconditions.checkNotNull(type);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return typeMap.getCollection(type);
+    }
+  }
+
+  /**
+   * Returns Containers by replication factor.
+   *
+   * @param factor - Replication Factor.
+   * @return NavigableSet.
+   */
+  NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
+    Preconditions.checkNotNull(factor);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return factorMap.getCollection(factor);
+    }
+  }
+
+  /**
+   * Returns Containers by State.
+   *
+   * @param state - State - Open, Closed etc.
+   * @return List of containers by state.
+   */
+  NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
+    Preconditions.checkNotNull(state);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return lifeCycleStateMap.getCollection(state);
+    }
+  }
+
+  /**
+   * Gets the containers that matches the  following filters.
+   *
+   * @param state - LifeCycleState
+   * @param owner - Owner
+   * @param factor - Replication Factor
+   * @param type - Replication Type
+   * @return ContainerInfo or Null if not container satisfies the criteria.
+   */
+  public NavigableSet<ContainerID> getMatchingContainerIDs(
+      LifeCycleState state, String owner,
+      ReplicationFactor factor, ReplicationType type) {
+
+    Preconditions.checkNotNull(state, "State cannot be null");
+    Preconditions.checkNotNull(owner, "Owner cannot be null");
+    Preconditions.checkNotNull(factor, "Factor cannot be null");
+    Preconditions.checkNotNull(type, "Type cannot be null");
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+
+      // If we cannot meet any one condition we return EMPTY_SET immediately.
+      // Since when we intersect these sets, the result will be empty if any
+      // one is empty.
+      NavigableSet<ContainerID> stateSet =
+          lifeCycleStateMap.getCollection(state);
+      if (stateSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
+      if (ownerSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
+      if (factorSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
+      if (typeSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+
+      // if we add more constraints we will just add those sets here..
+      NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+          ownerSet, factorSet, typeSet);
+
+      NavigableSet<ContainerID> currentSet = sets[0];
+      // We take the smallest set and intersect against the larger sets. This
+      // allows us to reduce the lookups to the least possible number.
+      for (int x = 1; x < sets.length; x++) {
+        currentSet = intersectSets(currentSet, sets[x]);
+      }
+      return currentSet;
+    }
+  }
+
+  /**
+   * Calculates the intersection between sets and returns a new set.
+   *
+   * @param smaller - First Set
+   * @param bigger - Second Set
+   * @return resultSet which is the intersection of these two sets.
+   */
+  private NavigableSet<ContainerID> intersectSets(
+      NavigableSet<ContainerID> smaller,
+      NavigableSet<ContainerID> bigger) {
+    Preconditions.checkState(smaller.size() <= bigger.size(),
+        "This function assumes the first set is lesser or equal to second " +
+            "set");
+    NavigableSet<ContainerID> resultSet = new TreeSet<>();
+    for (ContainerID id : smaller) {
+      if (bigger.contains(id)) {
+        resultSet.add(id);
+      }
+    }
+    return resultSet;
+  }
+
+  /**
+   * Sorts a list of Sets based on Size. This is useful when we are
+   * intersecting the sets.
+   *
+   * @param sets - varagrs of sets
+   * @return Returns a sorted array of sets based on the size of the set.
+   */
+  @SuppressWarnings("unchecked")
+  private NavigableSet<ContainerID>[] sortBySize(
+      NavigableSet<ContainerID>... sets) {
+    for (int x = 0; x < sets.length - 1; x++) {
+      for (int y = 0; y < sets.length - x - 1; y++) {
+        if (sets[y].size() > sets[y + 1].size()) {
+          NavigableSet temp = sets[y];
+          sets[y] = sets[y + 1];
+          sets[y + 1] = temp;
+        }
+      }
+    }
+    return sets;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java
new file mode 100644
index 0000000..cf20f39
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * Container States management package.
+ */
+package org.apache.hadoop.hdds.scm.container.states;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
new file mode 100644
index 0000000..227df3c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.exceptions;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by SCM.
+ */
+public class SCMException extends IOException {
+  private final ResultCodes result;
+
+  /**
+   * Constructs an {@code IOException} with {@code null}
+   * as its error detail message.
+   */
+  public SCMException(ResultCodes result) {
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   */
+  public SCMException(String message, ResultCodes result) {
+    super(message);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message
+   * and cause.
+   * <p>
+   * <p> Note that the detail message associated with {@code cause} is
+   * <i>not</i> automatically incorporated into this exception's detail
+   * message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public SCMException(String message, Throwable cause, ResultCodes result) {
+    super(message, cause);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified cause and a
+   * detail message of {@code (cause==null ? null : cause.toString())}
+   * (which typically contains the class and detail message of {@code cause}).
+   * This constructor is useful for IO exceptions that are little more
+   * than wrappers for other throwables.
+   *
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public SCMException(Throwable cause, ResultCodes result) {
+    super(cause);
+    this.result = result;
+  }
+
+  /**
+   * Returns resultCode.
+   * @return ResultCode
+   */
+  public ResultCodes getResult() {
+    return result;
+  }
+
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ResultCodes {
+    SUCCEESS,
+    FAILED_TO_LOAD_NODEPOOL,
+    FAILED_TO_FIND_NODE_IN_POOL,
+    FAILED_TO_FIND_HEALTHY_NODES,
+    FAILED_TO_FIND_NODES_WITH_SPACE,
+    FAILED_TO_FIND_SUITABLE_NODE,
+    INVALID_CAPACITY,
+    INVALID_BLOCK_SIZE,
+    CHILL_MODE_EXCEPTION,
+    FAILED_TO_LOAD_OPEN_CONTAINER,
+    FAILED_TO_ALLOCATE_CONTAINER,
+    FAILED_TO_CHANGE_CONTAINER_STATE,
+    CONTAINER_EXISTS,
+    FAILED_TO_FIND_CONTAINER,
+    FAILED_TO_FIND_CONTAINER_WITH_SPACE,
+    BLOCK_EXISTS,
+    FAILED_TO_FIND_BLOCK,
+    IO_EXCEPTION,
+    UNEXPECTED_CONTAINER_STATE,
+    SCM_NOT_INITIALIZED
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java
new file mode 100644
index 0000000..7b69310
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.exceptions;
+// Exceptions thrown by SCM.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
new file mode 100644
index 0000000..edbcfa1
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Command Queue is queue of commands for the datanode.
+ * <p>
+ * Node manager, container Manager and key space managers can queue commands 
for
+ * datanodes into this queue. These commands will be send in the order in which
+ * there where queued.
+ */
+public class CommandQueue {
+  // This list is used as default return value.
+  private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
+  private final Map<UUID, Commands> commandMap;
+  private final Lock lock;
+  private long commandsInQueue;
+
+  /**
+   * Returns number of commands in queue.
+   * @return Command Count.
+   */
+  public long getCommandsInQueue() {
+    return commandsInQueue;
+  }
+
+  /**
+   * Constructs a Command Queue.
+   * TODO : Add a flusher thread that throws away commands older than a certain
+   * time period.
+   */
+  public CommandQueue() {
+    commandMap = new HashMap<>();
+    lock = new ReentrantLock();
+    commandsInQueue = 0;
+  }
+
+  /**
+   * This function is used only for test purposes.
+   */
+  @VisibleForTesting
+  public void clear() {
+    lock.lock();
+    try {
+      commandMap.clear();
+      commandsInQueue = 0;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns  a list of Commands for the datanode to execute, if we have no
+   * commands returns a empty list otherwise the current set of
+   * commands are returned and command map set to empty list again.
+   *
+   * @param datanodeUuid Datanode UUID
+   * @return List of SCM Commands.
+   */
+  @SuppressWarnings("unchecked")
+  List<SCMCommand> getCommand(final UUID datanodeUuid) {
+    lock.lock();
+    try {
+      Commands cmds = commandMap.remove(datanodeUuid);
+      List<SCMCommand> cmdList = null;
+      if(cmds != null) {
+        cmdList = cmds.getCommands();
+        commandsInQueue -= cmdList.size() > 0 ? cmdList.size() : 0;
+        // A post condition really.
+        Preconditions.checkState(commandsInQueue >= 0);
+      }
+      return cmds == null ? DEFAULT_LIST : cmdList;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Adds a Command to the SCM Queue to send the command to container.
+   *
+   * @param datanodeUuid DatanodeDetails.Uuid
+   * @param command    - Command
+   */
+  public void addCommand(final UUID datanodeUuid, final SCMCommand
+      command) {
+    lock.lock();
+    try {
+      if (commandMap.containsKey(datanodeUuid)) {
+        commandMap.get(datanodeUuid).add(command);
+      } else {
+        commandMap.put(datanodeUuid, new Commands(command));
+      }
+      commandsInQueue++;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Class that stores commands for a datanode.
+   */
+  private static class Commands {
+    private long updateTime;
+    private long readTime;
+    private List<SCMCommand> commands;
+
+    /**
+     * Constructs a Commands class.
+     */
+    Commands() {
+      commands = new LinkedList<>();
+      updateTime = 0;
+      readTime = 0;
+    }
+
+    /**
+     * Creates the object and populates with the command.
+     * @param command command to add to queue.
+     */
+    Commands(SCMCommand command) {
+      this();
+      this.add(command);
+    }
+
+    /**
+     * Gets the last time the commands for this node was updated.
+     * @return Time stamp
+     */
+    public long getUpdateTime() {
+      return updateTime;
+    }
+
+    /**
+     * Gets the last read time.
+     * @return last time when these commands were read from this queue.
+     */
+    public long getReadTime() {
+      return readTime;
+    }
+
+    /**
+     * Adds a command to the list.
+     *
+     * @param command SCMCommand
+     */
+    public void add(SCMCommand command) {
+      this.commands.add(command);
+      updateTime = Time.monotonicNow();
+    }
+
+    /**
+     * Returns the commands for this datanode.
+     * @return command list.
+     */
+    public List<SCMCommand> getCommands() {
+      List<SCMCommand> temp = this.commands;
+      this.commands = new LinkedList<>();
+      readTime = Time.monotonicNow();
+      return temp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
new file mode 100644
index 0000000..43720f0
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * This class represents the item in SCM heartbeat queue.
+ */
+public class HeartbeatQueueItem {
+  private DatanodeDetails datanodeDetails;
+  private long recvTimestamp;
+  private SCMNodeReport nodeReport;
+  private ReportState containerReportState;
+
+  /**
+   *
+   * @param datanodeDetails - datanode ID of the heartbeat.
+   * @param recvTimestamp - heartbeat receive timestamp.
+   * @param nodeReport - node report associated with the heartbeat if any.
+   * @param containerReportState - container report state.
+   */
+  HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
+      SCMNodeReport nodeReport, ReportState containerReportState) {
+    this.datanodeDetails = datanodeDetails;
+    this.recvTimestamp = recvTimestamp;
+    this.nodeReport = nodeReport;
+    this.containerReportState = containerReportState;
+  }
+
+  /**
+   * @return datanode ID.
+   */
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  /**
+   * @return node report.
+   */
+  public SCMNodeReport getNodeReport() {
+    return nodeReport;
+  }
+
+  /**
+   * @return container report state.
+   */
+  public ReportState getContainerReportState() {
+    return containerReportState;
+  }
+
+  /**
+   * @return heartbeat receive timestamp.
+   */
+  public long getRecvTimestamp() {
+    return recvTimestamp;
+  }
+
+  /**
+   * Builder for HeartbeatQueueItem.
+   */
+  public static class Builder {
+    private DatanodeDetails datanodeDetails;
+    private SCMNodeReport nodeReport;
+    private ReportState containerReportState;
+    private long recvTimestamp = monotonicNow();
+
+    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
+      this.datanodeDetails = dnDetails;
+      return this;
+    }
+
+    public Builder setNodeReport(SCMNodeReport scmNodeReport) {
+      this.nodeReport = scmNodeReport;
+      return this;
+    }
+
+    public Builder setContainerReportState(ReportState crs) {
+      this.containerReportState = crs;
+      return this;
+    }
+
+    @VisibleForTesting
+    public Builder setRecvTimestamp(long recvTime) {
+      this.recvTimestamp = recvTime;
+      return this;
+    }
+
+    public HeartbeatQueueItem build() {
+      return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport,
+          containerReportState);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
new file mode 100644
index 0000000..4392633
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * A node manager supports a simple interface for managing a datanode.
+ * <p/>
+ * 1. A datanode registers with the NodeManager.
+ * <p/>
+ * 2. If the node is allowed to register, we add that to the nodes that we need
+ * to keep track of.
+ * <p/>
+ * 3. A heartbeat is made by the node at a fixed frequency.
+ * <p/>
+ * 4. A node can be in any of these 4 states: {HEALTHY, STALE, DEAD,
+ * DECOMMISSIONED}
+ * <p/>
+ * HEALTHY - It is a datanode that is regularly heartbeating us.
+ *
+ * STALE - A datanode for which we have missed few heart beats.
+ *
+ * DEAD - A datanode that we have not heard from for a while.
+ *
+ * DECOMMISSIONED - Someone told us to remove this node from the tracking
+ * list, by calling removeNode. We will throw away this nodes info soon.
+ */
+public interface NodeManager extends StorageContainerNodeProtocol,
+    NodeManagerMXBean, Closeable, Runnable {
+  /**
+   * Removes a data node from the management of this Node Manager.
+   *
+   * @param node - DataNode.
+   * @throws UnregisteredNodeException
+   */
+  void removeNode(DatanodeDetails node) throws UnregisteredNodeException;
+
+  /**
+   * Gets all Live Datanodes that is currently communicating with SCM.
+   * @param nodeState - State of the node
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+  List<DatanodeDetails> getNodes(NodeState nodeState);
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   * @param nodeState - State of the node
+   * @return int -- count
+   */
+  int getNodeCount(NodeState nodeState);
+
+  /**
+   * Get all datanodes known to SCM.
+   *
+   * @return List of DatanodeDetails known to SCM.
+   */
+  List<DatanodeDetails> getAllNodes();
+
+  /**
+   * Chill mode is the period when node manager waits for a minimum
+   * configured number of datanodes to report in. This is called chill mode
+   * to indicate the period before node manager gets into action.
+   *
+   * Forcefully exits the chill mode, even if we have not met the minimum
+   * criteria of the nodes reporting in.
+   */
+  void forceExitChillMode();
+
+  /**
+   * Puts the node manager into manual chill mode.
+   */
+  void enterChillMode();
+
+  /**
+   * Brings node manager out of manual chill mode.
+   */
+  void exitChillMode();
+
+  /**
+   * Returns the aggregated node stats.
+   * @return the aggregated node stats.
+   */
+  SCMNodeStat getStats();
+
+  /**
+   * Return a map of node stats.
+   * @return a map of individual node stats (live/stale but not dead).
+   */
+  Map<UUID, SCMNodeStat> getNodeStats();
+
+  /**
+   * Return the node stat of the specified datanode.
+   * @param datanodeDetails DatanodeDetails.
+   * @return node stat if it is live/stale, null if it is dead or does't exist.
+   */
+  SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
+
+  /**
+   * Returns the NodePoolManager associated with the NodeManager.
+   * @return NodePoolManager
+   */
+  NodePoolManager getNodePoolManager();
+
+  /**
+   * Wait for the heartbeat is processed by NodeManager.
+   * @return true if heartbeat has been processed.
+   */
+  @VisibleForTesting
+  boolean waitForHeartbeatProcessed();
+
+  /**
+   * Returns the node state of a specific node.
+   * @param datanodeDetails DatanodeDetails
+   * @return Healthy/Stale/Dead.
+   */
+  NodeState getNodeState(DatanodeDetails datanodeDetails);
+
+  /**
+   * Add a {@link SCMCommand} to the command queue, which are
+   * handled by HB thread asynchronously.
+   * @param dnId datanode uuid
+   * @param command
+   */
+  void addDatanodeCommand(UUID dnId, SCMCommand command);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java
new file mode 100644
index 0000000..3ac993b
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Map;
+
+/**
+ *
+ * This is the JMX management interface for node manager information.
+ */
[email protected]
+public interface NodeManagerMXBean {
+  /**
+   * Get the minimum number of nodes to get out of chill mode.
+   *
+   * @return int
+   */
+  int getMinimumChillModeNodes();
+
+  /**
+   * Returns a chill mode status string.
+   * @return String
+   */
+  String getChillModeStatus();
+
+
+  /**
+   * Returns true if node manager is out of chill mode, else false.
+   * @return true if out of chill mode, else false
+   */
+  boolean isOutOfChillMode();
+
+  /**
+   * Get the number of data nodes that in all states.
+   *
+   * @return A state to number of nodes that in this state mapping
+   */
+  Map<String, Integer> getNodeCount();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
new file mode 100644
index 0000000..46faf9ca
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface that defines SCM NodePoolManager.
+ */
+public interface NodePoolManager extends Closeable {
+
+  /**
+   * Add a node to a node pool.
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   */
+  void addNode(String pool, DatanodeDetails node) throws IOException;
+
+  /**
+   * Remove a node from a node pool.
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   * @throws SCMException
+   */
+  void removeNode(String pool, DatanodeDetails node)
+      throws SCMException;
+
+  /**
+   * Get a list of known node pools.
+   * @return a list of known node pool names or an empty list if not node pool
+   * is defined.
+   */
+  List<String> getNodePools();
+
+  /**
+   * Get all nodes of a node pool given the name of the node pool.
+   * @param pool - name of the node pool.
+   * @return a list of datanode ids or an empty list if the node pool was not
+   *  found.
+   */
+  List<DatanodeDetails> getNodes(String pool);
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   * @param datanodeDetails - datanode ID.
+   * @return node pool name if it has been assigned.
+   * null if the node has not been assigned to any node pool yet.
+   */
+  String getNodePool(DatanodeDetails datanodeDetails) throws SCMException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to