http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
deleted file mode 100644
index 71e17e9..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ /dev/null
@@ -1,699 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
-import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This
- * is used by SCM when
- * allocating new locations and when looking up a key.
- */
-public class ContainerMapping implements Mapping {
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
-      .class);
-
-  private final NodeManager nodeManager;
-  private final long cacheSize;
-  private final Lock lock;
-  private final Charset encoding = Charset.forName("UTF-8");
-  private final MetadataStore containerStore;
-  private final PipelineSelector pipelineSelector;
-  private final ContainerStateManager containerStateManager;
-  private final LeaseManager<ContainerInfo> containerLeaseManager;
-  private final EventPublisher eventPublisher;
-  private final long size;
-
-  /**
-   * Constructs a mapping class that creates mapping between container names
-   * and pipelines.
-   *
-   * @param nodeManager - NodeManager so that we can get the nodes that are
-   * healthy to place new
-   * containers.
-   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
-   * its nodes. This is
-   * passed to LevelDB and this memory is allocated in Native code space.
-   * CacheSize is specified
-   * in MB.
-   * @throws IOException on Failure.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerMapping(
-      final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB, EventPublisher eventPublisher) throws IOException {
-    this.nodeManager = nodeManager;
-    this.cacheSize = cacheSizeMB;
-
-    File metaDir = getOzoneMetaDirPath(conf);
-
-    // Write the container name to pipeline mapping.
-    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
-    containerStore =
-        MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(this.cacheSize * OzoneConsts.MB)
-            .build();
-
-    this.lock = new ReentrantLock();
-
-    size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
-        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
-    this.pipelineSelector = new PipelineSelector(nodeManager,
-            conf, eventPublisher, cacheSizeMB);
-
-    this.containerStateManager =
-        new ContainerStateManager(conf, this, pipelineSelector);
-    LOG.trace("Container State Manager created.");
-
-    this.eventPublisher = eventPublisher;
-
-    long containerCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    containerLeaseManager = new LeaseManager<>("ContainerCreation",
-        containerCreationLeaseTimeout);
-    containerLeaseManager.start();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public ContainerInfo getContainer(final long containerID) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      containerInfo = ContainerInfo.fromProtobuf(temp);
-      return containerInfo;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the ContainerInfo and pipeline from the containerID. If container
-   * has no available replicas in datanodes it returns pipeline with no
-   * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
-   * an empty pipeline.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  @Override
-  public ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException {
-    ContainerInfo contInfo;
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      contInfo = ContainerInfo.fromProtobuf(temp);
-
-      Pipeline pipeline;
-      String leaderId = "";
-      if (contInfo.isContainerOpen()) {
-        // If pipeline with given pipeline Id already exist return it
-        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
-      } else {
-        // For close containers create pipeline from datanodes with replicas
-        Set<DatanodeDetails> dnWithReplicas = containerStateManager
-            .getContainerReplicas(contInfo.containerID());
-        if (!dnWithReplicas.isEmpty()) {
-          leaderId = dnWithReplicas.iterator().next().getUuidString();
-        }
-        pipeline = new Pipeline(leaderId, contInfo.getState(),
-            ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
-            PipelineID.randomId());
-        dnWithReplicas.forEach(pipeline::addMember);
-      }
-      return new ContainerWithPipeline(contInfo, pipeline);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(long startContainerID,
-      int count) throws IOException {
-    List<ContainerInfo> containerList = new ArrayList<>();
-    lock.lock();
-    try {
-      if (containerStore.isEmpty()) {
-        throw new IOException("No container exists in current db");
-      }
-      byte[] startKey = startContainerID <= 0 ? null :
-          Longs.toByteArray(startContainerID);
-      List<Map.Entry<byte[], byte[]>> range =
-          containerStore.getSequentialRangeKVs(startKey, count, null);
-
-      // Transform the values into the pipelines.
-      // TODO: filter by container state
-      for (Map.Entry<byte[], byte[]> entry : range) {
-        ContainerInfo containerInfo =
-            ContainerInfo.fromProtobuf(
-                HddsProtos.SCMContainerInfo.PARSER.parseFrom(
-                    entry.getValue()));
-        Preconditions.checkNotNull(containerInfo);
-        containerList.add(containerInfo);
-      }
-    } finally {
-      lock.unlock();
-    }
-    return containerList;
-  }
-
-  /**
-   * Allocates a new container.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param owner - The string name of the Service that owns this container.
-   * @return - Pipeline that makes up this container.
-   * @throws IOException - Exception
-   */
-  @Override
-  public ContainerWithPipeline allocateContainer(
-      ReplicationType type,
-      ReplicationFactor replicationFactor,
-      String owner)
-      throws IOException {
-
-    ContainerInfo containerInfo;
-    ContainerWithPipeline containerWithPipeline;
-
-    lock.lock();
-    try {
-      containerWithPipeline = containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, owner);
-      containerInfo = containerWithPipeline.getContainerInfo();
-
-      byte[] containerIDBytes = Longs.toByteArray(
-          containerInfo.getContainerID());
-      containerStore.put(containerIDBytes, containerInfo.getProtobuf()
-              .toByteArray());
-    } finally {
-      lock.unlock();
-    }
-    return containerWithPipeline;
-  }
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerID - Container ID
-   * @throws IOException if container doesn't exist or container store failed
-   *                     to delete the
-   *                     specified key.
-   */
-  @Override
-  public void deleteContainer(long containerID) throws IOException {
-    lock.lock();
-    try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to delete container " + containerID + ", reason : " +
-                "container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerStore.delete(dbKey);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc} Used by client to update container state on SCM.
-   */
-  @Override
-  public HddsProtos.LifeCycleState updateContainerState(
-      long containerID, HddsProtos.LifeCycleEvent event) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to update container state"
-                + containerID
-                + ", reason : container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerInfo =
-          ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
-              .parseFrom(containerBytes));
-
-      Preconditions.checkNotNull(containerInfo);
-      switch (event) {
-      case CREATE:
-        // Acquire lease on container
-        Lease<ContainerInfo> containerLease =
-            containerLeaseManager.acquire(containerInfo);
-        // Register callback to be executed in case of timeout
-        containerLease.registerCallBack(() -> {
-          updateContainerState(containerID,
-              HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on container
-        containerLeaseManager.release(containerInfo);
-        break;
-      case FINALIZE:
-        // TODO: we don't need a lease manager here for closing as the
-        // container report will include the container state after HDFS-13008
-        // If a client failed to update the container close state, DN container
-        // report from 3 DNs will be used to close the container eventually.
-        break;
-      case CLOSE:
-        break;
-      case UPDATE:
-        break;
-      case DELETE:
-        break;
-      case TIMEOUT:
-        break;
-      case CLEANUP:
-        break;
-      default:
-        throw new SCMException("Unsupported container LifeCycleEvent.",
-            FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-      // If the below updateContainerState call fails, we should revert the
-      // changes made in switch case.
-      // Like releasing the lease in case of BEGIN_CREATE.
-      ContainerInfo updatedContainer = containerStateManager
-          .updateContainerState(containerInfo, event);
-      if (!updatedContainer.isContainerOpen()) {
-        pipelineSelector.removeContainerFromPipeline(
-                containerInfo.getPipelineID(), containerID);
-      }
-      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
-      return updatedContainer.getState();
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
-  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
-      throws IOException {
-    if (deleteTransactionMap == null) {
-      return;
-    }
-
-    lock.lock();
-    try {
-      BatchOperation batch = new BatchOperation();
-      for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
-        long containerID = entry.getKey();
-        byte[] dbKey = Longs.toByteArray(containerID);
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes == null) {
-          throw new SCMException(
-              "Failed to increment number of deleted blocks for container "
-                  + containerID + ", reason : " + "container doesn't exist.",
-              SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-        }
-        ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
-            HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
-        containerInfo.updateDeleteTransactionId(entry.getValue());
-        batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
-      }
-      containerStore.writeBatch(batch);
-      containerStateManager
-          .updateDeleteTransactionId(deleteTransactionMap);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the container State Manager.
-   *
-   * @return ContainerStateManager
-   */
-  @Override
-  public ContainerStateManager getStateManager() {
-    return containerStateManager;
-  }
-
-  /**
-   * Return a container matching the attributes specified.
-   *
-   * @param sizeRequired - Space needed in the Container.
-   * @param owner - Owner of the container - A specific nameservice.
-   * @param type - Replication Type {StandAlone, Ratis}
-   * @param factor - Replication Factor {ONE, THREE}
-   * @param state - State of the Container-- {Open, Allocated etc.}
-   * @return ContainerInfo, null if there is no match found.
-   */
-  public ContainerWithPipeline getMatchingContainerWithPipeline(
-      final long sizeRequired, String owner, ReplicationType type,
-      ReplicationFactor factor, LifeCycleState state) throws IOException {
-    ContainerInfo containerInfo = getStateManager()
-        .getMatchingContainer(sizeRequired, owner, type, factor, state);
-    if (containerInfo == null) {
-      return null;
-    }
-    Pipeline pipeline = pipelineSelector
-        .getPipeline(containerInfo.getPipelineID());
-    return new ContainerWithPipeline(containerInfo, pipeline);
-  }
-
-  /**
-   * Process container report from Datanode.
-   * <p>
-   * Processing follows a very simple logic for time being.
-   * <p>
-   * 1. Datanodes report the current State -- denoted by the datanodeState
-   * <p>
-   * 2. We are the older SCM state from the Database -- denoted by
-   * the knownState.
-   * <p>
-   * 3. We copy the usage etc. from currentState to newState and log that
-   * newState to the DB. This allows us SCM to bootup again and read the
-   * state of the world from the DB, and then reconcile the state from
-   * container reports, when they arrive.
-   *
-   * @param reports Container report
-   */
-  @Override
-  public void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException {
-    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-        containerInfos = reports.getReportsList();
-    PendingDeleteStatusList pendingDeleteStatusList =
-        new PendingDeleteStatusList(datanodeDetails);
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
-        containerInfos) {
-      // Update replica info during registration process.
-      if (isRegisterCall) {
-        try {
-          getStateManager().addContainerReplica(ContainerID.
-              valueof(contInfo.getContainerID()), datanodeDetails);
-        } catch (Exception ex) {
-          // Continue to next one after logging the error.
-          LOG.error("Error while adding replica for containerId {}.",
-              contInfo.getContainerID(), ex);
-        }
-      }
-      byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
-      lock.lock();
-      try {
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes != null) {
-          HddsProtos.SCMContainerInfo knownState =
-              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-
-          if (knownState.getState() == LifeCycleState.CLOSING
-              && contInfo.getState() == LifeCycleState.CLOSED) {
-
-            updateContainerState(contInfo.getContainerID(),
-                LifeCycleEvent.CLOSE);
-
-            //reread the container
-            knownState =
-                HddsProtos.SCMContainerInfo.PARSER
-                    .parseFrom(containerStore.get(dbKey));
-          }
-
-          HddsProtos.SCMContainerInfo newState =
-              reconcileState(contInfo, knownState, datanodeDetails);
-
-          if (knownState.getDeleteTransactionId() > contInfo
-              .getDeleteTransactionId()) {
-            pendingDeleteStatusList
-                .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
-                    knownState.getDeleteTransactionId(),
-                    knownState.getContainerID());
-          }
-
-          // FIX ME: This can be optimized, we write twice to memory, where a
-          // single write would work well.
-          //
-          // We need to write this to DB again since the closed only write
-          // the updated State.
-          containerStore.put(dbKey, newState.toByteArray());
-
-        } else {
-          // Container not found in our container db.
-          LOG.error("Error while processing container report from datanode :" +
-                  " {}, for container: {}, reason: container doesn't exist in" 
+
-                  "container database.", datanodeDetails,
-              contInfo.getContainerID());
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
-      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
-          pendingDeleteStatusList);
-    }
-
-  }
-
-  /**
-   * Reconciles the state from Datanode with the state in SCM.
-   *
-   * @param datanodeState - State from the Datanode.
-   * @param knownState - State inside SCM.
-   * @param dnDetails
-   * @return new SCM State for this container.
-   */
-  private HddsProtos.SCMContainerInfo reconcileState(
-      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
-      SCMContainerInfo knownState, DatanodeDetails dnDetails) {
-    HddsProtos.SCMContainerInfo.Builder builder =
-        HddsProtos.SCMContainerInfo.newBuilder();
-    builder.setContainerID(knownState.getContainerID())
-        .setPipelineID(knownState.getPipelineID())
-        .setReplicationType(knownState.getReplicationType())
-        .setReplicationFactor(knownState.getReplicationFactor());
-
-    // TODO: If current state doesn't have this DN in list of DataNodes with
-    // replica then add it in list of replicas.
-
-    // If used size is greater than allocated size, we will be updating
-    // allocated size with used size. This update is done as a fallback
-    // mechanism in case SCM crashes without properly updating allocated
-    // size. Correct allocated value will be updated by
-    // ContainerStateManager during SCM shutdown.
-    long usedSize = datanodeState.getUsed();
-    long allocated = knownState.getAllocatedBytes() > usedSize ?
-        knownState.getAllocatedBytes() : usedSize;
-    builder.setAllocatedBytes(allocated)
-        .setUsedBytes(usedSize)
-        .setNumberOfKeys(datanodeState.getKeyCount())
-        .setState(knownState.getState())
-        .setStateEnterTime(knownState.getStateEnterTime())
-        .setContainerID(knownState.getContainerID())
-        .setDeleteTransactionId(knownState.getDeleteTransactionId());
-    if (knownState.getOwner() != null) {
-      builder.setOwner(knownState.getOwner());
-    }
-    return builder.build();
-  }
-
-
-  /**
-   * In Container is in closed state, if it is in closed, Deleting or Deleted
-   * State.
-   *
-   * @param info - ContainerInfo.
-   * @return true if is in open state, false otherwise
-   */
-  private boolean shouldClose(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.OPEN;
-  }
-
-  private boolean isClosed(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.CLOSED;
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated with it.
-   * If the stream is
-   * already closed then invoking this method has no effect.
-   * <p>
-   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
-   * fail require careful
-   * attention. It is strongly advised to relinquish the underlying resources
-   * and to internally
-   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
-   * {@code IOException}.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    if (containerLeaseManager != null) {
-      containerLeaseManager.shutdown();
-    }
-    if (containerStateManager != null) {
-      flushContainerInfo();
-      containerStateManager.close();
-    }
-    if (containerStore != null) {
-      containerStore.close();
-    }
-
-    if (pipelineSelector != null) {
-      pipelineSelector.shutdown();
-    }
-  }
-
-  /**
-   * Since allocatedBytes of a container is only in memory, stored in
-   * containerStateManager, when closing ContainerMapping, we need to update
-   * this in the container store.
-   *
-   * @throws IOException on failure.
-   */
-  @VisibleForTesting
-  public void flushContainerInfo() throws IOException {
-    List<ContainerInfo> containers = containerStateManager.getAllContainers();
-    List<Long> failedContainers = new ArrayList<>();
-    for (ContainerInfo info : containers) {
-      // even if some container updated failed, others can still proceed
-      try {
-        byte[] dbKey = Longs.toByteArray(info.getContainerID());
-        byte[] containerBytes = containerStore.get(dbKey);
-        // TODO : looks like when a container is deleted, the container is
-        // removed from containerStore but not containerStateManager, so it can
-        // return info of a deleted container. may revisit this in the future,
-        // for now, just skip a not-found container
-        if (containerBytes != null) {
-          containerStore.put(dbKey, info.getProtobuf().toByteArray());
-        } else {
-          LOG.debug("Container state manager has container {} but not found " +
-                  "in container store, a deleted container?",
-              info.getContainerID());
-        }
-      } catch (IOException ioe) {
-        failedContainers.add(info.getContainerID());
-      }
-    }
-    if (!failedContainers.isEmpty()) {
-      throw new IOException("Error in flushing container info from container " 
+
-          "state manager: " + failedContainers);
-    }
-  }
-
-  @VisibleForTesting
-  public MetadataStore getContainerStore() {
-    return containerStore;
-  }
-
-  public PipelineSelector getPipelineSelector() {
-    return pipelineSelector;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
deleted file mode 100644
index 71935f0..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles container reports from datanode.
- */
-public class ContainerReportHandler implements
-    EventHandler<ContainerReportFromDatanode> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerReportHandler.class);
-
-  private final NodeManager nodeManager;
-
-  private final Mapping containerMapping;
-
-  private ContainerStateManager containerStateManager;
-
-  private ReplicationActivityStatus replicationStatus;
-
-  public ContainerReportHandler(Mapping containerMapping,
-      NodeManager nodeManager,
-      ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(containerMapping);
-    Preconditions.checkNotNull(nodeManager);
-    Preconditions.checkNotNull(replicationActivityStatus);
-    this.containerStateManager = containerMapping.getStateManager();
-    this.nodeManager = nodeManager;
-    this.containerMapping = containerMapping;
-    this.replicationStatus = replicationActivityStatus;
-  }
-
-  @Override
-  public void onMessage(ContainerReportFromDatanode 
containerReportFromDatanode,
-      EventPublisher publisher) {
-
-    DatanodeDetails datanodeOrigin =
-        containerReportFromDatanode.getDatanodeDetails();
-
-    ContainerReportsProto containerReport =
-        containerReportFromDatanode.getReport();
-    try {
-
-      //update state in container db and trigger close container events
-      containerMapping
-          .processContainerReports(datanodeOrigin, containerReport, false);
-
-      Set<ContainerID> containerIds = containerReport.getReportsList().stream()
-          .map(StorageContainerDatanodeProtocolProtos
-              .ContainerInfo::getContainerID)
-          .map(ContainerID::new)
-          .collect(Collectors.toSet());
-
-      ReportResult<ContainerID> reportResult = nodeManager
-          .processContainerReport(datanodeOrigin.getUuid(), containerIds);
-
-      //we have the report, so we can update the states for the next iteration.
-      nodeManager
-          .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
-
-      for (ContainerID containerID : reportResult.getMissingEntries()) {
-        containerStateManager
-            .removeContainerReplica(containerID, datanodeOrigin);
-        checkReplicationState(containerID, publisher);
-      }
-
-      for (ContainerID containerID : reportResult.getNewEntries()) {
-        containerStateManager.addContainerReplica(containerID, datanodeOrigin);
-        checkReplicationState(containerID, publisher);
-      }
-
-    } catch (IOException e) {
-      //TODO: stop all the replication?
-      LOG.error("Error on processing container report from datanode {}",
-          datanodeOrigin, e);
-    }
-
-  }
-
-  private void checkReplicationState(ContainerID containerID,
-      EventPublisher publisher)
-      throws SCMException {
-    ContainerInfo container = containerStateManager.getContainer(containerID);
-
-    if (container == null) {
-      //warning unknown container
-      LOG.warn(
-          "Container is missing from containerStateManager. Can't request "
-              + "replication. {}",
-          containerID);
-      return;
-    }
-    if (container.isContainerOpen()) {
-      return;
-    }
-
-    ReplicationRequest replicationState =
-        containerStateManager.checkReplicationState(containerID);
-    if (replicationState != null) {
-      if (replicationStatus.isReplicationEnabled()) {
-        publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-            replicationState);
-      } else {
-        LOG.warn(
-            "Over/under replicated container but the replication is not "
-                + "(yet) enabled: "
-                + replicationState.toString());
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
deleted file mode 100644
index 930c098..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ /dev/null
@@ -1,570 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-import org.apache.hadoop.hdds.scm.container.states.ContainerState;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.util.Time;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-
-/**
- * A container state manager keeps track of container states and returns
- * containers that match various queries.
- * <p>
- * This state machine is driven by a combination of server and client actions.
- * <p>
- * This is how a create container happens: 1. When a container is created, the
- * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
- * has chosen a pipeline for container to live on. However, the container is 
not
- * created yet. This container along with the pipeline is returned to the
- * client.
- * <p>
- * 2. The client when it sees the Container state as ALLOCATED understands that
- * container needs to be created on the specified pipeline. The client lets the
- * SCM know that saw this flag and is initiating the on the data nodes.
- * <p>
- * This is done by calling into notifyObjectCreation(ContainerName,
- * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
- * into CREATING. All this state means is that SCM told Client to create a
- * container and client saw that request.
- * <p>
- * 3. Then client makes calls to datanodes directly, asking the datanodes to
- * create the container. This is done with the help of pipeline that supports
- * this container.
- * <p>
- * 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifying the containerName and the
- * COMPLETE_CREATE as the Event.
- * <p>
- * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
- * the state when clients can write to a container.
- * <p>
- * 6. If the client does not respond with the COMPLETE_CREATE event with a
- * certain time, the state machine times out and triggers a delete operation of
- * the container.
- * <p>
- * Please see the function initializeStateMachine below to see how this looks 
in
- * code.
- * <p>
- * Reusing existing container :
- * <p>
- * The create container call is not made all the time, the system tries to use
- * open containers as much as possible. So in those cases, it looks thru the
- * list of open containers and will return containers that match the specific
- * signature.
- * <p>
- * Please note : Logically there are 3 separate state machines in the case of
- * containers.
- * <p>
- * The Create State Machine -- Commented extensively above.
- * <p>
- * Open/Close State Machine - Once the container is in the Open State,
- * eventually it will be closed, once sufficient data has been written to it.
- * <p>
- * TimeOut Delete Container State Machine - if the container creating times 
out,
- * then Container State manager decides to delete the container.
- */
-public class ContainerStateManager implements Closeable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStateManager.class);
-
-  private final StateMachine<HddsProtos.LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
-
-  private final long containerSize;
-  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
-  private final ContainerStateMap containers;
-  private final AtomicLong containerCount;
-
-  /**
-   * Constructs a Container State Manager that tracks all containers owned by
-   * SCM for the purpose of allocation of blocks.
-   * <p>
-   * TODO : Add Container Tags so we know which containers are owned by SCM.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerStateManager(Configuration configuration,
-      Mapping containerMapping, PipelineSelector pipelineSelector) {
-
-    // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
-
-    // These are the steady states of a container.
-    finalStates.add(LifeCycleState.OPEN);
-    finalStates.add(LifeCycleState.CLOSED);
-    finalStates.add(LifeCycleState.DELETED);
-
-    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-
-    this.containerSize = (long) configuration.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-
-    lastUsedMap = new ConcurrentHashMap<>();
-    containerCount = new AtomicLong(0);
-    containers = new ContainerStateMap();
-    loadExistingContainers(containerMapping, pipelineSelector);
-  }
-
-  private void loadExistingContainers(Mapping containerMapping,
-                                      PipelineSelector pipelineSelector) {
-
-    List<ContainerInfo> containerList;
-    try {
-      containerList = containerMapping.listContainer(0, Integer.MAX_VALUE);
-
-      // if there are no container to load, let us return.
-      if (containerList == null || containerList.size() == 0) {
-        LOG.info("No containers to load for this cluster.");
-        return;
-      }
-    } catch (IOException e) {
-      if (!e.getMessage().equals("No container exists in current db")) {
-        LOG.error("Could not list the containers", e);
-      }
-      return;
-    }
-
-    try {
-      long maxID = 0;
-      for (ContainerInfo container : containerList) {
-        containers.addContainer(container);
-        pipelineSelector.addContainerToPipeline(
-                container.getPipelineID(), container.getContainerID());
-
-        if (maxID < container.getContainerID()) {
-          maxID = container.getContainerID();
-        }
-
-        containerCount.set(maxID);
-      }
-    } catch (SCMException ex) {
-      LOG.error("Unable to create a container information. ", ex);
-      // Fix me, what is the proper shutdown procedure for SCM ??
-      // System.exit(1) // Should we exit here?
-    }
-  }
-
-  /**
-   * Return the info of all the containers kept by the in-memory mapping.
-   *
-   * @return the list of all container info.
-   */
-  public List<ContainerInfo> getAllContainers() {
-    List<ContainerInfo> list = new ArrayList<>();
-
-    //No Locking needed since the return value is an immutable map.
-    containers.getContainerMap().forEach((key, value) -> list.add(value));
-    return list;
-  }
-
-  /*
-   *
-   * Event and State Transition Mapping:
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CLOSED   ----------------> DELETING
-   * Event:                DELETE
-   *
-   * State: DELETING ----------------> DELETED
-   * Event:               CLEANUP
-   *
-   * State: CREATING  ---------------> DELETING
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]------->[CLOSED]
-   *            (CREATE)     |    (CREATED)       (FINALIZE)     (CLOSE)    |
-   *                         |                                              |
-   *                         |                                              |
-   *                         |(TIMEOUT)                             (DELETE)|
-   *                         |                                              |
-   *                         +-------------> [DELETING] <-------------------+
-   *                                            |
-   *                                            |
-   *                                   (CLEANUP)|
-   *                                            |
-   *                                        [DELETED]
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(LifeCycleState.ALLOCATED,
-        LifeCycleState.CREATING,
-        LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.OPEN,
-        LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(LifeCycleState.OPEN,
-        LifeCycleState.CLOSING,
-        LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSING,
-        LifeCycleState.CLOSED,
-        LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSED,
-        LifeCycleState.DELETING,
-        LifeCycleEvent.DELETE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.DELETING,
-        LifeCycleEvent.TIMEOUT);
-
-    stateMachine.addTransition(LifeCycleState.DELETING,
-        LifeCycleState.DELETED,
-        LifeCycleEvent.CLEANUP);
-  }
-
-  /**
-   * allocates a new container based on the type, replication etc.
-   *
-   * @param selector -- Pipeline selector class.
-   * @param type -- Replication type.
-   * @param replicationFactor - Replication replicationFactor.
-   * @return ContainerWithPipeline
-   * @throws IOException  on Failure.
-   */
-  public ContainerWithPipeline allocateContainer(PipelineSelector selector,
-      HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor replicationFactor, String owner)
-      throws IOException {
-
-    Pipeline pipeline = selector.getReplicationPipeline(type,
-        replicationFactor);
-
-    Preconditions.checkNotNull(pipeline, "Pipeline type=%s/"
-        + "replication=%s couldn't be found for the new container. "
-        + "Do you have enough nodes?", type, replicationFactor);
-
-    long containerID = containerCount.incrementAndGet();
-    ContainerInfo containerInfo = new ContainerInfo.Builder()
-        .setState(HddsProtos.LifeCycleState.ALLOCATED)
-        .setPipelineID(pipeline.getId())
-        // This is bytes allocated for blocks inside container, not the
-        // container size
-        .setAllocatedBytes(0)
-        .setUsedBytes(0)
-        .setNumberOfKeys(0)
-        .setStateEnterTime(Time.monotonicNow())
-        .setOwner(owner)
-        .setContainerID(containerID)
-        .setDeleteTransactionId(0)
-        .setReplicationFactor(replicationFactor)
-        .setReplicationType(pipeline.getType())
-        .build();
-    selector.addContainerToPipeline(pipeline.getId(), containerID);
-    Preconditions.checkNotNull(containerInfo);
-    containers.addContainer(containerInfo);
-    LOG.trace("New container allocated: {}", containerInfo);
-    return new ContainerWithPipeline(containerInfo, pipeline);
-  }
-
-  /**
-   * Update the Container State to the next state.
-   *
-   * @param info - ContainerInfo
-   * @param event - LifeCycle Event
-   * @return Updated ContainerInfo.
-   * @throws SCMException  on Failure.
-   */
-  public ContainerInfo updateContainerState(ContainerInfo
-      info, HddsProtos.LifeCycleEvent event) throws SCMException {
-    LifeCycleState newState;
-    try {
-      newState = this.stateMachine.getNextState(info.getState(), event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update container state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          info.getContainerID(), info.getState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    containers.updateState(info, info.getState(), newState);
-    return containers.getContainerInfo(info);
-  }
-
-  /**
-   * Update the container State.
-   * @param info - Container Info
-   * @return  ContainerInfo
-   * @throws SCMException - on Error.
-   */
-  public ContainerInfo updateContainerInfo(ContainerInfo info)
-      throws SCMException {
-    containers.updateContainerInfo(info);
-    return containers.getContainerInfo(info);
-  }
-
-  /**
-   * Update deleteTransactionId for a container.
-   *
-   * @param deleteTransactionMap maps containerId to its new
-   *                             deleteTransactionID
-   */
-  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) {
-    for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
-      containers.getContainerMap().get(ContainerID.valueof(entry.getKey()))
-          .updateDeleteTransactionId(entry.getValue());
-    }
-  }
-
-  /**
-   * Return a container matching the attributes specified.
-   *
-   * @param size - Space needed in the Container.
-   * @param owner - Owner of the container - A specific nameservice.
-   * @param type - Replication Type {StandAlone, Ratis}
-   * @param factor - Replication Factor {ONE, THREE}
-   * @param state - State of the Container-- {Open, Allocated etc.}
-   * @return ContainerInfo, null if there is no match found.
-   */
-  public ContainerInfo getMatchingContainer(final long size,
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) {
-
-    // Find containers that match the query spec, if no match return null.
-    NavigableSet<ContainerID> matchingSet =
-        containers.getMatchingContainerIDs(state, owner, factor, type);
-    if (matchingSet == null || matchingSet.size() == 0) {
-      return null;
-    }
-
-    // Get the last used container and find container above the last used
-    // container ID.
-    ContainerState key = new ContainerState(owner, type, factor);
-    ContainerID lastID = lastUsedMap.get(key);
-    if (lastID == null) {
-      lastID = matchingSet.first();
-    }
-
-    // There is a small issue here. The first time, we will skip the first
-    // container. But in most cases it will not matter.
-    NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
-    if (resultSet.size() == 0) {
-      resultSet = matchingSet;
-    }
-
-    ContainerInfo selectedContainer =
-        findContainerWithSpace(size, resultSet, owner);
-    if (selectedContainer == null) {
-
-      // If we did not find any space in the tailSet, we need to look for
-      // space in the headset, we need to pass true to deal with the
-      // situation that we have a lone container that has space. That is we
-      // ignored the last used container under the assumption we can find
-      // other containers with space, but if have a single container that is
-      // not true. Hence we need to include the last used container as the
-      // last element in the sorted set.
-
-      resultSet = matchingSet.headSet(lastID, true);
-      selectedContainer = findContainerWithSpace(size, resultSet, owner);
-    }
-    // Update the allocated Bytes on this container.
-    if (selectedContainer != null) {
-      selectedContainer.updateAllocatedBytes(size);
-    }
-    return selectedContainer;
-
-  }
-
-  private ContainerInfo findContainerWithSpace(long size,
-      NavigableSet<ContainerID> searchSet, String owner) {
-    // Get the container with space to meet our request.
-    for (ContainerID id : searchSet) {
-      ContainerInfo containerInfo = containers.getContainerInfo(id);
-      if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
-        containerInfo.updateLastUsedTime();
-
-        ContainerState key = new ContainerState(owner,
-            containerInfo.getReplicationType(),
-            containerInfo.getReplicationFactor());
-        lastUsedMap.put(key, containerInfo.containerID());
-        return containerInfo;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Returns a set of ContainerIDs that match the Container.
-   *
-   * @param owner  Owner of the Containers.
-   * @param type - Replication Type of the containers
-   * @param factor - Replication factor of the containers.
-   * @param state - Current State, like Open, Close etc.
-   * @return Set of containers that match the specific query parameters.
-   */
-  public NavigableSet<ContainerID> getMatchingContainerIDs(
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) {
-    return containers.getMatchingContainerIDs(state, owner,
-        factor, type);
-  }
-
-  /**
-   * Returns the containerInfo with pipeline for the given container id.
-   * @param selector -- Pipeline selector class.
-   * @param containerID id of the container
-   * @return ContainerInfo containerInfo
-   * @throws IOException
-   */
-  public ContainerWithPipeline getContainer(PipelineSelector selector,
-      ContainerID containerID) {
-    ContainerInfo info = containers.getContainerInfo(containerID.getId());
-    Pipeline pipeline = selector.getPipeline(info.getPipelineID());
-    return new ContainerWithPipeline(info, pipeline);
-  }
-
-  /**
-   * Returns the containerInfo for the given container id.
-   * @param containerID id of the container
-   * @return ContainerInfo containerInfo
-   * @throws IOException
-   */
-  public ContainerInfo getContainer(ContainerID containerID) {
-    return containers.getContainerInfo(containerID);
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  /**
-   * Returns the latest list of DataNodes where replica for given containerId
-   * exist. Throws an SCMException if no entry is found for given containerId.
-   *
-   * @param containerID
-   * @return Set<DatanodeDetails>
-   */
-  public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
-      throws SCMException {
-    return containers.getContainerReplicas(containerID);
-  }
-
-  /**
-   * Add a container Replica for given DataNode.
-   *
-   * @param containerID
-   * @param dn
-   */
-  public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) 
{
-    containers.addContainerReplica(containerID, dn);
-  }
-
-  /**
-   * Remove a container Replica for given DataNode.
-   *
-   * @param containerID
-   * @param dn
-   * @return True of dataNode is removed successfully else false.
-   */
-  public boolean removeContainerReplica(ContainerID containerID,
-      DatanodeDetails dn) throws SCMException {
-    return containers.removeContainerReplica(containerID, dn);
-  }
-
-  /**
-   * Compare the existing replication number with the expected one.
-   */
-  public ReplicationRequest checkReplicationState(ContainerID containerID)
-      throws SCMException {
-    int existingReplicas = getContainerReplicas(containerID).size();
-    int expectedReplicas = getContainer(containerID)
-        .getReplicationFactor().getNumber();
-    if (existingReplicas != expectedReplicas) {
-      return new ReplicationRequest(containerID.getId(), existingReplicas,
-          expectedReplicas);
-    }
-    return null;
-  }
-
-  /**
-   * Checks if the container is open.
-   */
-  public boolean isOpen(ContainerID containerID) {
-    Preconditions.checkNotNull(containerID);
-    ContainerInfo container = Preconditions
-        .checkNotNull(getContainer(containerID),
-            "Container can't be found " + containerID);
-    return container.isContainerOpen();
-  }
-
-  @VisibleForTesting
-  public ContainerStateMap getContainerStateMap() {
-    return containers;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
deleted file mode 100644
index 5ed80cb..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This 
is
- * used by SCM when allocating new locations and when looking up a key.
- */
-public interface Mapping extends Closeable {
-  /**
-   * Returns the ContainerInfo from the container ID.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerInfo such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerInfo getContainer(long containerID) throws IOException;
-
-  /**
-   * Returns the ContainerInfo from the container ID.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException;
-
-  /**
-   * Returns containers under certain conditions.
-   * Search container IDs from start ID(exclusive),
-   * The max size of the searching range cannot exceed the
-   * value of count.
-   *
-   * @param startContainerID start containerID, >=0,
-   * start searching at the head if 0.
-   * @param count count must be >= 0
-   *              Usually the count will be replace with a very big
-   *              value instead of being unlimited in case the db is very big.
-   *
-   * @return a list of container.
-   * @throws IOException
-   */
-  List<ContainerInfo> listContainer(long startContainerID, int count)
-      throws IOException;
-
-  /**
-   * Allocates a new container for a given keyName and replication factor.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param owner
-   * @return - ContainerWithPipeline.
-   * @throws IOException
-   */
-  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor replicationFactor, String owner)
-      throws IOException;
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerID - Container ID
-   * @throws IOException
-   */
-  void deleteContainer(long containerID) throws IOException;
-
-  /**
-   * Update container state.
-   * @param containerID - Container ID
-   * @param event - container life cycle event
-   * @return - new container state
-   * @throws IOException
-   */
-  HddsProtos.LifeCycleState updateContainerState(long containerID,
-      HddsProtos.LifeCycleEvent event) throws IOException;
-
-  /**
-   * Returns the container State Manager.
-   * @return ContainerStateManager
-   */
-  ContainerStateManager getStateManager();
-
-  /**
-   * Process container report from Datanode.
-   *
-   * @param reports Container report
-   */
-  void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException;
-
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
-  void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
-      throws IOException;
-
-  /**
-   * Returns the ContainerWithPipeline.
-   * @return NodeManager
-   */
-  ContainerWithPipeline getMatchingContainerWithPipeline(long size,
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) throws IOException;
-
-  PipelineSelector getPipelineSelector();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
deleted file mode 100644
index ee02bbd..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-/**
- * This package has class that close a container. That is move a container from
- * open state to close state.
- */
-package org.apache.hadoop.hdds.scm.container.closer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
deleted file mode 100644
index 3f8d056..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container;
-/**
- * This package contains routines to manage the container location and
- * mapping inside SCM
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
deleted file mode 100644
index 3336c8e..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A ContainerPlacementPolicy support choosing datanodes to build replication
- * pipeline with specified constraints.
- */
-public interface ContainerPlacementPolicy {
-
-  /**
-   * Given the replication factor and size required, return set of datanodes
-   * that satisfy the nodes and size requirement.
-   *
-   * @param excludedNodes - list of nodes to be excluded.
-   * @param nodesRequired - number of datanodes required.
-   * @param sizeRequired - size required for the container or block.
-   * @return list of datanodes chosen.
-   * @throws IOException
-   */
-  List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes,
-      int nodesRequired, long sizeRequired)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
deleted file mode 100644
index 60861b7..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.stream.Collectors;
-
-/**
- * SCM CommonPolicy implements a set of invariants which are common
- * for all container placement policies, acts as the repository of helper
- * functions which are common to placement policies.
- */
-public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
-  @VisibleForTesting
-  static final Logger LOG =
-      LoggerFactory.getLogger(SCMCommonPolicy.class);
-  private final NodeManager nodeManager;
-  private final Random rand;
-  private final Configuration conf;
-
-  /**
-   * Constructs SCM Common Policy Class.
-   *
-   * @param nodeManager NodeManager
-   * @param conf Configuration class.
-   */
-  public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
-    this.nodeManager = nodeManager;
-    this.rand = new Random();
-    this.conf = conf;
-  }
-
-  /**
-   * Return node manager.
-   *
-   * @return node manager
-   */
-  public NodeManager getNodeManager() {
-    return nodeManager;
-  }
-
-  /**
-   * Returns the Random Object.
-   *
-   * @return rand
-   */
-  public Random getRand() {
-    return rand;
-  }
-
-  /**
-   * Get Config.
-   *
-   * @return Configuration
-   */
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Given the replication factor and size required, return set of datanodes
-   * that satisfy the nodes and size requirement.
-   * <p>
-   * Here are some invariants of container placement.
-   * <p>
-   * 1. We place containers only on healthy nodes.
-   * 2. We place containers on nodes with enough space for that container.
-   * 3. if a set of containers are requested, we either meet the required
-   * number of nodes or we fail that request.
-   *
-   *
-   * @param excludedNodes - datanodes with existing replicas
-   * @param nodesRequired - number of datanodes required.
-   * @param sizeRequired - size required for the container or block.
-   * @return list of datanodes chosen.
-   * @throws SCMException SCM exception.
-   */
-
-  public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes,
-      int nodesRequired, final long sizeRequired) throws SCMException {
-    List<DatanodeDetails> healthyNodes =
-        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    healthyNodes.removeAll(excludedNodes);
-    String msg;
-    if (healthyNodes.size() == 0) {
-      msg = "No healthy node found to allocate container.";
-      LOG.error(msg);
-      throw new SCMException(msg, SCMException.ResultCodes
-          .FAILED_TO_FIND_HEALTHY_NODES);
-    }
-
-    if (healthyNodes.size() < nodesRequired) {
-      msg = String.format("Not enough healthy nodes to allocate container. %d "
-              + " datanodes required. Found %d",
-          nodesRequired, healthyNodes.size());
-      LOG.error(msg);
-      throw new SCMException(msg,
-          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
-    }
-    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
-        hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList());
-
-    if (healthyList.size() < nodesRequired) {
-      msg = String.format("Unable to find enough nodes that meet the space " +
-              "requirement of %d bytes in healthy node set." +
-              " Nodes required: %d Found: %d",
-          sizeRequired, nodesRequired, healthyList.size());
-      LOG.error(msg);
-      throw new SCMException(msg,
-          SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE);
-    }
-
-    return healthyList;
-  }
-
-  /**
-   * Returns true if this node has enough space to meet our requirement.
-   *
-   * @param datanodeDetails DatanodeDetails
-   * @return true if we have enough space.
-   */
-  private boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
-                                 long sizeRequired) {
-    SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
-    return (nodeMetric != null) && (nodeMetric.get() != null)
-        && nodeMetric.get().getRemaining().hasResources(sizeRequired);
-  }
-
-  /**
-   * This function invokes the derived classes chooseNode Function to build a
-   * list of nodes. Then it verifies that invoked policy was able to return
-   * expected number of nodes.
-   *
-   * @param nodesRequired - Nodes Required
-   * @param healthyNodes - List of Nodes in the result set.
-   * @return List of Datanodes that can be used for placement.
-   * @throws SCMException
-   */
-  public List<DatanodeDetails> getResultSet(
-      int nodesRequired, List<DatanodeDetails> healthyNodes)
-      throws SCMException {
-    List<DatanodeDetails> results = new LinkedList<>();
-    for (int x = 0; x < nodesRequired; x++) {
-      // invoke the choose function defined in the derived classes.
-      DatanodeDetails nodeId = chooseNode(healthyNodes);
-      if (nodeId != null) {
-        results.add(nodeId);
-      }
-    }
-
-    if (results.size() < nodesRequired) {
-      LOG.error("Unable to find the required number of healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
-          nodesRequired, results.size());
-      throw new SCMException("Unable to find required number of nodes.",
-          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
-    }
-    return results;
-  }
-
-  /**
-   * Choose a datanode according to the policy, this function is implemented
-   * by the actual policy class. For example, PlacementCapacity or
-   * PlacementRandom.
-   *
-   * @param healthyNodes - Set of healthy nodes we can choose from.
-   * @return DatanodeDetails
-   */
-  public abstract DatanodeDetails chooseNode(
-      List<DatanodeDetails> healthyNodes);
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
deleted file mode 100644
index 8df8f6e..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Container placement policy that randomly choose datanodes with remaining
- * space to satisfy the size constraints.
- * <p>
- * The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes
- * and then pick the node which lower utilization. This leads to a higher
- * probability of nodes with lower utilization to be picked.
- * <p>
- * For those wondering why we choose two nodes randomly and choose the node
- * with lower utilization. There are links to this original papers in
- * HDFS-11564.
- * <p>
- * A brief summary -- We treat the nodes from a scale of lowest utilized to
- * highest utilized, there are (s * ( s + 1)) / 2 possibilities to build
- * distinct pairs of nodes.  There are s - k pairs of nodes in which the rank
- * k node is less than the couple. So probability of a picking a node is
- * (2 * (s -k)) / (s * (s - 1)).
- * <p>
- * In English, There is a much higher probability of picking less utilized 
nodes
- * as compared to nodes with higher utilization since we pick 2 nodes and
- * then pick the node with lower utilization.
- * <p>
- * This avoids the issue of users adding new nodes into the cluster and HDFS
- * sending all traffic to those nodes if we only use a capacity based
- * allocation scheme. Unless those nodes are part of the set of the first 2
- * nodes then newer nodes will not be in the running to get the container.
- * <p>
- * This leads to an I/O pattern where the lower utilized nodes are favoured
- * more than higher utilized nodes, but part of the I/O will still go to the
- * older higher utilized nodes.
- * <p>
- * With this algorithm in place, our hope is that balancer tool needs to do
- * little or no work and the cluster will achieve a balanced distribution
- * over time.
- */
-public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
-  @VisibleForTesting
-  static final Logger LOG =
-      LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
-
-  /**
-   * Constructs a Container Placement with considering only capacity.
-   * That is this policy tries to place containers based on node weight.
-   *
-   * @param nodeManager Node Manager
-   * @param conf Configuration
-   */
-  public SCMContainerPlacementCapacity(final NodeManager nodeManager,
-      final Configuration conf) {
-    super(nodeManager, conf);
-  }
-
-  /**
-   * Called by SCM to choose datanodes.
-   *
-   *
-   * @param excludedNodes - list of the datanodes to exclude.
-   * @param nodesRequired - number of datanodes required.
-   * @param sizeRequired - size required for the container or block.
-   * @return List of datanodes.
-   * @throws SCMException  SCMException
-   */
-  @Override
-  public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes, final int nodesRequired,
-      final long sizeRequired) throws SCMException {
-    List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
-    if (healthyNodes.size() == nodesRequired) {
-      return healthyNodes;
-    }
-    return getResultSet(nodesRequired, healthyNodes);
-  }
-
-  /**
-   * Find a node from the healthy list and return it after removing it from the
-   * list that we are operating on.
-   *
-   * @param healthyNodes - List of healthy nodes that meet the size
-   * requirement.
-   * @return DatanodeDetails that is chosen.
-   */
-  @Override
-  public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
-    int firstNodeNdx = getRand().nextInt(healthyNodes.size());
-    int secondNodeNdx = getRand().nextInt(healthyNodes.size());
-
-    DatanodeDetails datanodeDetails;
-    // There is a possibility that both numbers will be same.
-    // if that is so, we just return the node.
-    if (firstNodeNdx == secondNodeNdx) {
-      datanodeDetails = healthyNodes.get(firstNodeNdx);
-    } else {
-      DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
-      DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
-      SCMNodeMetric firstNodeMetric =
-          getNodeManager().getNodeStat(firstNodeDetails);
-      SCMNodeMetric secondNodeMetric =
-          getNodeManager().getNodeStat(secondNodeDetails);
-      datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
-          ? firstNodeDetails : secondNodeDetails;
-    }
-    healthyNodes.remove(datanodeDetails);
-    return datanodeDetails;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
deleted file mode 100644
index 76702d5..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Container placement policy that randomly chooses healthy datanodes.
- * This is very similar to current HDFS placement. That is we
- * just randomly place containers without any considerations of utilization.
- * <p>
- * That means we rely on balancer to achieve even distribution of data.
- * Balancer will need to support containers as a feature before this class
- * can be practically used.
- */
-public final class SCMContainerPlacementRandom extends SCMCommonPolicy
-    implements ContainerPlacementPolicy {
-  @VisibleForTesting
-  static final Logger LOG =
-      LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
-
-  /**
-   * Construct a random Block Placement policy.
-   *
-   * @param nodeManager nodeManager
-   * @param conf Config
-   */
-  public SCMContainerPlacementRandom(final NodeManager nodeManager,
-      final Configuration conf) {
-    super(nodeManager, conf);
-  }
-
-  /**
-   * Choose datanodes called by the SCM to choose the datanode.
-   *
-   *
-   * @param excludedNodes - list of the datanodes to exclude.
-   * @param nodesRequired - number of datanodes required.
-   * @param sizeRequired - size required for the container or block.
-   * @return List of Datanodes.
-   * @throws SCMException  SCMException
-   */
-  @Override
-  public List<DatanodeDetails> chooseDatanodes(
-      List<DatanodeDetails> excludedNodes, final int nodesRequired,
-      final long sizeRequired) throws SCMException {
-    List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
-
-    if (healthyNodes.size() == nodesRequired) {
-      return healthyNodes;
-    }
-    return getResultSet(nodesRequired, healthyNodes);
-  }
-
-  /**
-   * Just chose a node randomly and remove it from the set of nodes we can
-   * chose from.
-   *
-   * @param healthyNodes - all healthy datanodes.
-   * @return one randomly chosen datanode that from two randomly chosen 
datanode
-   */
-  public DatanodeDetails chooseNode(final List<DatanodeDetails> healthyNodes) {
-    DatanodeDetails selectedNode =
-        healthyNodes.get(getRand().nextInt(healthyNodes.size()));
-    healthyNodes.remove(selectedNode);
-    return selectedNode;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
deleted file mode 100644
index 1cb810d..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-// Various placement algorithms.
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to