http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java
deleted file mode 100644
index d0ac974..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.scm.cli.container;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.ozone.scm.cli.OzoneCommandHandler;
-import org.apache.hadoop.ozone.web.utils.JsonUtils;
-import org.apache.hadoop.scm.client.ScmClient;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.hadoop.ozone.scm.cli.SCMCLI.CMD_WIDTH;
-import static org.apache.hadoop.ozone.scm.cli.SCMCLI.HELP_OP;
-
-/**
- * This is the handler that process container list command.
- */
-public class ListContainerHandler extends OzoneCommandHandler {
-
-  public static final String CONTAINER_LIST = "list";
-  public static final String OPT_START_CONTAINER = "start";
-  public static final String OPT_PREFIX_CONTAINER = "prefix";
-  public static final String OPT_COUNT = "count";
-
-  /**
-   * Constructs a handler object.
-   *
-   * @param scmClient scm client
-   */
-  public ListContainerHandler(ScmClient scmClient) {
-    super(scmClient);
-  }
-
-  @Override
-  public void execute(CommandLine cmd) throws IOException {
-    if (!cmd.hasOption(CONTAINER_LIST)) {
-      throw new IOException("Expecting container list");
-    }
-    if (cmd.hasOption(HELP_OP)) {
-      displayHelp();
-      return;
-    }
-
-    if (!cmd.hasOption(OPT_COUNT)) {
-      displayHelp();
-      if (!cmd.hasOption(HELP_OP)) {
-        throw new IOException("Expecting container count");
-      } else {
-        return;
-      }
-    }
-
-    String startName = cmd.getOptionValue(OPT_START_CONTAINER);
-    String prefixName = cmd.getOptionValue(OPT_PREFIX_CONTAINER);
-    int count = 0;
-
-    if (cmd.hasOption(OPT_COUNT)) {
-      count = Integer.parseInt(cmd.getOptionValue(OPT_COUNT));
-      if (count < 0) {
-        displayHelp();
-        throw new IOException("-count should not be negative");
-      }
-    }
-
-    List<ContainerInfo> containerList =
-        getScmClient().listContainer(startName, prefixName, count);
-
-    // Output data list
-    for (ContainerInfo container : containerList) {
-      outputContainerPipeline(container.getPipeline());
-    }
-  }
-
-  private void outputContainerPipeline(Pipeline pipeline) throws IOException {
-    // Print container report info.
-    logOut("%s", JsonUtils.toJsonStringWithDefaultPrettyPrinter(
-        pipeline.toJsonString()));
-  }
-
-  @Override
-  public void displayHelp() {
-    Options options = new Options();
-    addOptions(options);
-    HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.printHelp(CMD_WIDTH, "hdfs scm -container -list <option>",
-        "where <option> can be the following", options, "");
-  }
-
-  public static void addOptions(Options options) {
-    Option startContainerOpt = new Option(OPT_START_CONTAINER,
-        true, "Specify start container name");
-    Option endContainerOpt = new Option(OPT_PREFIX_CONTAINER,
-        true, "Specify prefix container name");
-    Option countOpt = new Option(OPT_COUNT, true,
-        "Specify count number, required");
-    options.addOption(countOpt);
-    options.addOption(startContainerOpt);
-    options.addOption(endContainerOpt);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/package-info.java
deleted file mode 100644
index 1c9f40d..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.scm.cli.container;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/package-info.java
deleted file mode 100644
index d1f9775..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.scm.cli;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
deleted file mode 100644
index 5950d9a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.scm.container;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.container.closer.ContainerCloser;
-import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
-import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.scm.node.NodeManager;
-import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This
- * is used by SCM when
- * allocating new locations and when looking up a key.
- */
-public class ContainerMapping implements Mapping {
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
-      .class);
-
-  private final NodeManager nodeManager;
-  private final long cacheSize;
-  private final Lock lock;
-  private final Charset encoding = Charset.forName("UTF-8");
-  private final MetadataStore containerStore;
-  private final PipelineSelector pipelineSelector;
-  private final ContainerStateManager containerStateManager;
-  private final LeaseManager<ContainerInfo> containerLeaseManager;
-  private final ContainerSupervisor containerSupervisor;
-  private final float containerCloseThreshold;
-  private final ContainerCloser closer;
-  private final long size;
-
-  /**
-   * Constructs a mapping class that creates mapping between container names
-   * and pipelines.
-   *
-   * @param nodeManager - NodeManager so that we can get the nodes that are
-   * healthy to place new
-   * containers.
-   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
-   * its nodes. This is
-   * passed to LevelDB and this memory is allocated in Native code space.
-   * CacheSize is specified
-   * in MB.
-   * @throws IOException on Failure.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerMapping(
-      final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB) throws IOException {
-    this.nodeManager = nodeManager;
-    this.cacheSize = cacheSizeMB;
-    this.closer = new ContainerCloser(nodeManager, conf);
-
-    File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
-
-    // Write the container name to pipeline mapping.
-    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
-    containerStore =
-        MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(this.cacheSize * OzoneConsts.MB)
-            .build();
-
-    this.lock = new ReentrantLock();
-
-    this.pipelineSelector = new PipelineSelector(nodeManager, conf);
-
-    // To be replaced with code getStorageSize once it is committed.
-    size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
-        OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
-    this.containerStateManager =
-        new ContainerStateManager(conf, this);
-    this.containerSupervisor =
-        new ContainerSupervisor(conf, nodeManager,
-            nodeManager.getNodePoolManager());
-    this.containerCloseThreshold = conf.getFloat(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
-    LOG.trace("Container State Manager created.");
-
-    long containerCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    LOG.trace("Starting Container Lease Manager.");
-    containerLeaseManager = new LeaseManager<>(containerCreationLeaseTimeout);
-    containerLeaseManager.start();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public ContainerInfo getContainer(final String containerName) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(containerName.getBytes(
-          encoding));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerName,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-
-      OzoneProtos.SCMContainerInfo temp = OzoneProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      containerInfo = ContainerInfo.fromProtobuf(temp);
-      return containerInfo;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(String startName,
-      String prefixName, int count) throws IOException {
-    List<ContainerInfo> containerList = new ArrayList<>();
-    lock.lock();
-    try {
-      if (containerStore.isEmpty()) {
-        throw new IOException("No container exists in current db");
-      }
-      MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
-      byte[] startKey = startName == null ? null : DFSUtil.string2Bytes(
-          startName);
-      List<Map.Entry<byte[], byte[]>> range =
-          containerStore.getSequentialRangeKVs(startKey, count, prefixFilter);
-
-      // Transform the values into the pipelines.
-      // TODO: filter by container state
-      for (Map.Entry<byte[], byte[]> entry : range) {
-        ContainerInfo containerInfo =
-            ContainerInfo.fromProtobuf(
-                OzoneProtos.SCMContainerInfo.PARSER.parseFrom(
-                    entry.getValue()));
-        Preconditions.checkNotNull(containerInfo);
-        containerList.add(containerInfo);
-      }
-    } finally {
-      lock.unlock();
-    }
-    return containerList;
-  }
-
-  /**
-   * Allocates a new container.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param containerName - Name of the container.
-   * @param owner - The string name of the Service that owns this container.
-   * @return - Pipeline that makes up this container.
-   * @throws IOException - Exception
-   */
-  @Override
-  public ContainerInfo allocateContainer(
-      ReplicationType type,
-      ReplicationFactor replicationFactor,
-      final String containerName,
-      String owner)
-      throws IOException {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
-
-    ContainerInfo containerInfo;
-    if (!nodeManager.isOutOfChillMode()) {
-      throw new SCMException(
-          "Unable to create container while in chill mode",
-          SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
-    }
-
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(containerName.getBytes(
-          encoding));
-      if (containerBytes != null) {
-        throw new SCMException(
-            "Specified container already exists. key : " + containerName,
-            SCMException.ResultCodes.CONTAINER_EXISTS);
-      }
-      containerInfo =
-          containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, containerName,
-              owner);
-      containerStore.put(
-          containerName.getBytes(encoding), containerInfo.getProtobuf()
-              .toByteArray());
-    } finally {
-      lock.unlock();
-    }
-    return containerInfo;
-  }
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerName - Container name
-   * @throws IOException if container doesn't exist or container store failed
-   *                     to delete the
-   *                     specified key.
-   */
-  @Override
-  public void deleteContainer(String containerName) throws IOException {
-    lock.lock();
-    try {
-      byte[] dbKey = containerName.getBytes(encoding);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to delete container " + containerName + ", reason : " +
-                "container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerStore.delete(dbKey);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc} Used by client to update container state on SCM.
-   */
-  @Override
-  public OzoneProtos.LifeCycleState updateContainerState(
-      String containerName, OzoneProtos.LifeCycleEvent event) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] dbKey = containerName.getBytes(encoding);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to update container state"
-                + containerName
-                + ", reason : container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerInfo =
-          ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
-              .parseFrom(containerBytes));
-
-      Preconditions.checkNotNull(containerInfo);
-      switch (event) {
-      case CREATE:
-        // Acquire lease on container
-        Lease<ContainerInfo> containerLease =
-            containerLeaseManager.acquire(containerInfo);
-        // Register callback to be executed in case of timeout
-        containerLease.registerCallBack(() -> {
-          updateContainerState(containerName,
-              OzoneProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on container
-        containerLeaseManager.release(containerInfo);
-        break;
-      case FINALIZE:
-        // TODO: we don't need a lease manager here for closing as the
-        // container report will include the container state after HDFS-13008
-        // If a client failed to update the container close state, DN container
-        // report from 3 DNs will be used to close the container eventually.
-        break;
-      case CLOSE:
-        break;
-      case UPDATE:
-        break;
-      case DELETE:
-        break;
-      case TIMEOUT:
-        break;
-      case CLEANUP:
-        break;
-      default:
-        throw new SCMException("Unsupported container LifeCycleEvent.",
-            FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-      // If the below updateContainerState call fails, we should revert the
-      // changes made in switch case.
-      // Like releasing the lease in case of BEGIN_CREATE.
-      ContainerInfo updatedContainer = containerStateManager
-          .updateContainerState(containerInfo, event);
-      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
-      return updatedContainer.getState();
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the container State Manager.
-   *
-   * @return ContainerStateManager
-   */
-  @Override
-  public ContainerStateManager getStateManager() {
-    return containerStateManager;
-  }
-
-  /**
-   * Process container report from Datanode.
-   * <p>
-   * Processing follows a very simple logic for time being.
-   * <p>
-   * 1. Datanodes report the current State -- denoted by the datanodeState
-   * <p>
-   * 2. We are the older SCM state from the Database -- denoted by
-   * the knownState.
-   * <p>
-   * 3. We copy the usage etc. from currentState to newState and log that
-   * newState to the DB. This allows us SCM to bootup again and read the
-   * state of the world from the DB, and then reconcile the state from
-   * container reports, when they arrive.
-   *
-   * @param reports Container report
-   */
-  @Override
-  public void processContainerReports(ContainerReportsRequestProto reports)
-      throws IOException {
-    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-        containerInfos = reports.getReportsList();
-    containerSupervisor.handleContainerReport(reports);
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
-        containerInfos) {
-      byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray();
-      lock.lock();
-      try {
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes != null) {
-          OzoneProtos.SCMContainerInfo knownState =
-              OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-
-          OzoneProtos.SCMContainerInfo newState =
-              reconcileState(datanodeState, knownState);
-
-          // FIX ME: This can be optimized, we write twice to memory, where a
-          // single write would work well.
-          //
-          // We need to write this to DB again since the closed only write
-          // the updated State.
-          containerStore.put(dbKey, newState.toByteArray());
-
-          // If the container is closed, then state is already written to SCM
-          // DB.TODO: So can we can write only once to DB.
-          if (closeContainerIfNeeded(newState)) {
-            LOG.info("Closing the Container: {}", newState.getContainerName());
-          }
-        } else {
-          // Container not found in our container db.
-          LOG.error("Error while processing container report from datanode :" +
-                  " {}, for container: {}, reason: container doesn't exist in" 
+
-                  "container database.", reports.getDatanodeID(),
-              datanodeState.getContainerName());
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
-  /**
-   * Reconciles the state from Datanode with the state in SCM.
-   *
-   * @param datanodeState - State from the Datanode.
-   * @param knownState - State inside SCM.
-   * @return new SCM State for this container.
-   */
-  private OzoneProtos.SCMContainerInfo reconcileState(
-      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
-      OzoneProtos.SCMContainerInfo knownState) {
-    OzoneProtos.SCMContainerInfo.Builder builder =
-        OzoneProtos.SCMContainerInfo.newBuilder();
-    builder.setContainerName(knownState.getContainerName());
-    builder.setPipeline(knownState.getPipeline());
-    // If used size is greater than allocated size, we will be updating
-    // allocated size with used size. This update is done as a fallback
-    // mechanism in case SCM crashes without properly updating allocated
-    // size. Correct allocated value will be updated by
-    // ContainerStateManager during SCM shutdown.
-    long usedSize = datanodeState.getUsed();
-    long allocated = knownState.getAllocatedBytes() > usedSize ?
-        knownState.getAllocatedBytes() : usedSize;
-    builder.setAllocatedBytes(allocated);
-    builder.setUsedBytes(usedSize);
-    builder.setNumberOfKeys(datanodeState.getKeyCount());
-    builder.setState(knownState.getState());
-    builder.setStateEnterTime(knownState.getStateEnterTime());
-    builder.setContainerID(knownState.getContainerID());
-    if (knownState.getOwner() != null) {
-      builder.setOwner(knownState.getOwner());
-    }
-    return builder.build();
-  }
-
-  /**
-   * Queues the close container command, to datanode and writes the new state
-   * to container DB.
-   * <p>
-   * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have
-   * one protobuf in one file and another definition in another file.
-   *
-   * @param newState - This is the state we maintain in SCM.
-   * @throws IOException
-   */
-  private boolean closeContainerIfNeeded(OzoneProtos.SCMContainerInfo newState)
-      throws IOException {
-    float containerUsedPercentage = 1.0f *
-        newState.getUsedBytes() / this.size;
-
-    ContainerInfo scmInfo = getContainer(newState.getContainerName());
-    if (containerUsedPercentage >= containerCloseThreshold
-        && !isClosed(scmInfo)) {
-      // We will call closer till get to the closed state.
-      // That is SCM will make this call repeatedly until we reach the closed
-      // state.
-      closer.close(newState);
-
-      if (shouldClose(scmInfo)) {
-        // This event moves the Container from Open to Closing State, this is
-        // a state inside SCM. This is the desired state that SCM wants this
-        // container to reach. We will know that a container has reached the
-        // closed state from container reports. This state change should be
-        // invoked once and only once.
-        OzoneProtos.LifeCycleState state = updateContainerState(
-            scmInfo.getContainerName(),
-            OzoneProtos.LifeCycleEvent.FINALIZE);
-        if (state != OzoneProtos.LifeCycleState.CLOSING) {
-          LOG.error("Failed to close container {}, reason : Not able " +
-                  "to " +
-                  "update container state, current container state: {}.",
-              newState.getContainerName(), state);
-          return false;
-        }
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * In Container is in closed state, if it is in closed, Deleting or Deleted
-   * State.
-   *
-   * @param info - ContainerInfo.
-   * @return true if is in open state, false otherwise
-   */
-  private boolean shouldClose(ContainerInfo info) {
-    return info.getState() == OzoneProtos.LifeCycleState.OPEN;
-  }
-
-  private boolean isClosed(ContainerInfo info) {
-    return info.getState() == OzoneProtos.LifeCycleState.CLOSED;
-  }
-
-  @VisibleForTesting
-  public ContainerCloser getCloser() {
-    return closer;
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated with it.
-   * If the stream is
-   * already closed then invoking this method has no effect.
-   * <p>
-   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
-   * fail require careful
-   * attention. It is strongly advised to relinquish the underlying resources
-   * and to internally
-   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
-   * {@code IOException}.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    if (containerLeaseManager != null) {
-      containerLeaseManager.shutdown();
-    }
-    if (containerStateManager != null) {
-      flushContainerInfo();
-      containerStateManager.close();
-    }
-    if (containerStore != null) {
-      containerStore.close();
-    }
-  }
-
-  /**
-   * Since allocatedBytes of a container is only in memory, stored in
-   * containerStateManager, when closing ContainerMapping, we need to update
-   * this in the container store.
-   *
-   * @throws IOException on failure.
-   */
-  @VisibleForTesting
-  public void flushContainerInfo() throws IOException {
-    List<ContainerInfo> containers = containerStateManager.getAllContainers();
-    List<String> failedContainers = new ArrayList<>();
-    for (ContainerInfo info : containers) {
-      // even if some container updated failed, others can still proceed
-      try {
-        byte[] dbKey = info.getContainerName().getBytes(encoding);
-        byte[] containerBytes = containerStore.get(dbKey);
-        // TODO : looks like when a container is deleted, the container is
-        // removed from containerStore but not containerStateManager, so it can
-        // return info of a deleted container. may revisit this in the future,
-        // for now, just skip a not-found container
-        if (containerBytes != null) {
-          OzoneProtos.SCMContainerInfo oldInfoProto =
-              OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-          ContainerInfo oldInfo = ContainerInfo.fromProtobuf(oldInfoProto);
-          ContainerInfo newInfo = new ContainerInfo.Builder()
-              .setAllocatedBytes(info.getAllocatedBytes())
-              .setContainerName(oldInfo.getContainerName())
-              .setNumberOfKeys(oldInfo.getNumberOfKeys())
-              .setOwner(oldInfo.getOwner())
-              .setPipeline(oldInfo.getPipeline())
-              .setState(oldInfo.getState())
-              .setUsedBytes(oldInfo.getUsedBytes())
-              .build();
-          containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
-        } else {
-          LOG.debug("Container state manager has container {} but not found " +
-                  "in container store, a deleted container?",
-              info.getContainerName());
-        }
-      } catch (IOException ioe) {
-        failedContainers.add(info.getContainerName());
-      }
-    }
-    if (!failedContainers.isEmpty()) {
-      throw new IOException("Error in flushing container info from container " 
+
-          "state manager: " + failedContainers);
-    }
-  }
-
-  @VisibleForTesting
-  public MetadataStore getContainerStore() {
-    return containerStore;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
deleted file mode 100644
index c586107..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.scm.container;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
-import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerState;
-import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerStateMap;
-import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-
-/**
- * A container state manager keeps track of container states and returns
- * containers that match various queries.
- * <p>
- * This state machine is driven by a combination of server and client actions.
- * <p>
- * This is how a create container happens: 1. When a container is created, the
- * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
- * has chosen a pipeline for container to live on. However, the container is 
not
- * created yet. This container along with the pipeline is returned to the
- * client.
- * <p>
- * 2. The client when it sees the Container state as ALLOCATED understands that
- * container needs to be created on the specified pipeline. The client lets the
- * SCM know that saw this flag and is initiating the on the data nodes.
- * <p>
- * This is done by calling into notifyObjectCreation(ContainerName,
- * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
- * into CREATING. All this state means is that SCM told Client to create a
- * container and client saw that request.
- * <p>
- * 3. Then client makes calls to datanodes directly, asking the datanodes to
- * create the container. This is done with the help of pipeline that supports
- * this container.
- * <p>
- * 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifying the containerName and the
- * COMPLETE_CREATE as the Event.
- * <p>
- * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
- * the state when clients can write to a container.
- * <p>
- * 6. If the client does not respond with the COMPLETE_CREATE event with a
- * certain time, the state machine times out and triggers a delete operation of
- * the container.
- * <p>
- * Please see the function initializeStateMachine below to see how this looks 
in
- * code.
- * <p>
- * Reusing existing container :
- * <p>
- * The create container call is not made all the time, the system tries to use
- * open containers as much as possible. So in those cases, it looks thru the
- * list of open containers and will return containers that match the specific
- * signature.
- * <p>
- * Please note : Logically there are 3 separate state machines in the case of
- * containers.
- * <p>
- * The Create State Machine -- Commented extensively above.
- * <p>
- * Open/Close State Machine - Once the container is in the Open State,
- * eventually it will be closed, once sufficient data has been written to it.
- * <p>
- * TimeOut Delete Container State Machine - if the container creating times 
out,
- * then Container State manager decides to delete the container.
- */
-public class ContainerStateManager implements Closeable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStateManager.class);
-
-  private final StateMachine<OzoneProtos.LifeCycleState,
-      OzoneProtos.LifeCycleEvent> stateMachine;
-
-  private final long containerSize;
-  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
-  private final ContainerStateMap containers;
-  private final AtomicLong containerCount;
-
-  /**
-   * Constructs a Container State Manager that tracks all containers owned by
-   * SCM for the purpose of allocation of blocks.
-   * <p>
-   * TODO : Add Container Tags so we know which containers are owned by SCM.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerStateManager(Configuration configuration,
-      Mapping containerMapping) {
-
-    // Initialize the container state machine.
-    Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
-
-    // These are the steady states of a container.
-    finalStates.add(LifeCycleState.OPEN);
-    finalStates.add(LifeCycleState.CLOSED);
-    finalStates.add(LifeCycleState.DELETED);
-
-    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-
-    this.containerSize = OzoneConsts.GB * configuration.getInt(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
-
-    lastUsedMap = new ConcurrentHashMap<>();
-    containerCount = new AtomicLong(0);
-    containers = new ContainerStateMap();
-    loadExistingContainers(containerMapping);
-  }
-
-  private void loadExistingContainers(Mapping containerMapping) {
-
-    List<ContainerInfo> containerList;
-    try {
-      containerList = containerMapping.listContainer(null,
-          null, Integer.MAX_VALUE);
-
-      // if there are no container to load, let us return.
-      if (containerList == null || containerList.size() == 0) {
-        LOG.info("No containers to load for this cluster.");
-        return;
-      }
-    } catch (IOException e) {
-      if (!e.getMessage().equals("No container exists in current db")) {
-        LOG.error("Could not list the containers", e);
-      }
-      return;
-    }
-
-    try {
-      long maxID = 0;
-      for (ContainerInfo container : containerList) {
-        containers.addContainer(container);
-
-        if (maxID < container.getContainerID()) {
-          maxID = container.getContainerID();
-        }
-
-        containerCount.set(maxID);
-      }
-    } catch (SCMException ex) {
-      LOG.error("Unable to create a container information. ", ex);
-      // Fix me, what is the proper shutdown procedure for SCM ??
-      // System.exit(1) // Should we exit here?
-    }
-  }
-
-  /**
-   * Return the info of all the containers kept by the in-memory mapping.
-   *
-   * @return the list of all container info.
-   */
-  public List<ContainerInfo> getAllContainers() {
-    List<ContainerInfo> list = new ArrayList<>();
-
-    //No Locking needed since the return value is an immutable map.
-    containers.getContainerMap().forEach((key, value) -> list.add(value));
-    return list;
-  }
-
-  /*
-   *
-   * Event and State Transition Mapping:
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CLOSED   ----------------> DELETING
-   * Event:                DELETE
-   *
-   * State: DELETING ----------------> DELETED
-   * Event:               CLEANUP
-   *
-   * State: CREATING  ---------------> DELETING
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * 
[ALLOCATED]------->[CREATING]--------->[OPEN]---------->[CLOSING]------->[CLOSED]
-   *            (CREATE)     |    (CREATED)       (FINALIZE)          (CLOSE)  
  |
-   *                         |                                                 
  |
-   *                         |                                                 
  |
-   *                         |(TIMEOUT)                                  
(DELETE)|
-   *                         |                                                 
  |
-   *                         +------------------> [DELETING] 
<-------------------+
-   *                                                   |
-   *                                                   |
-   *                                          (CLEANUP)|
-   *                                                   |
-   *                                               [DELETED]
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(LifeCycleState.ALLOCATED,
-        LifeCycleState.CREATING,
-        LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.OPEN,
-        LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(LifeCycleState.OPEN,
-        LifeCycleState.CLOSING,
-        LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSING,
-        LifeCycleState.CLOSED,
-        LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSED,
-        LifeCycleState.DELETING,
-        LifeCycleEvent.DELETE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.DELETING,
-        LifeCycleEvent.TIMEOUT);
-
-    stateMachine.addTransition(LifeCycleState.DELETING,
-        LifeCycleState.DELETED,
-        LifeCycleEvent.CLEANUP);
-  }
-
-  /**
-   * allocates a new container based on the type, replication etc.
-   *
-   * @param selector -- Pipeline selector class.
-   * @param type -- Replication type.
-   * @param replicationFactor - Replication replicationFactor.
-   * @param containerName - Container Name.
-   * @return Container Info.
-   * @throws IOException  on Failure.
-   */
-  public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
-      .ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
-      final String containerName, String owner) throws
-      IOException {
-
-    Pipeline pipeline = selector.getReplicationPipeline(type,
-        replicationFactor, containerName);
-    ContainerInfo containerInfo = new ContainerInfo.Builder()
-        .setContainerName(containerName)
-        .setState(OzoneProtos.LifeCycleState.ALLOCATED)
-        .setPipeline(pipeline)
-        // This is bytes allocated for blocks inside container, not the
-        // container size
-        .setAllocatedBytes(0)
-        .setUsedBytes(0)
-        .setNumberOfKeys(0)
-        .setStateEnterTime(Time.monotonicNow())
-        .setOwner(owner)
-        .setContainerID(containerCount.incrementAndGet())
-        .build();
-    Preconditions.checkNotNull(containerInfo);
-    containers.addContainer(containerInfo);
-    LOG.trace("New container allocated: {}", containerInfo);
-    return containerInfo;
-  }
-
-  /**
-   * Update the Container State to the next state.
-   *
-   * @param info - ContainerInfo
-   * @param event - LifeCycle Event
-   * @return Updated ContainerInfo.
-   * @throws SCMException  on Failure.
-   */
-  public ContainerInfo updateContainerState(ContainerInfo
-      info, OzoneProtos.LifeCycleEvent event) throws SCMException {
-    LifeCycleState newState;
-    try {
-      newState = this.stateMachine.getNextState(info.getState(), event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update container state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          info.getPipeline().getContainerName(), info.getState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    containers.updateState(info, info.getState(), newState);
-    return containers.getContainerInfo(info);
-  }
-
-  /**
-   * Update the container State.
-   * @param info - Container Info
-   * @return  ContainerInfo
-   * @throws SCMException - on Error.
-   */
-  public ContainerInfo updateContainerInfo(ContainerInfo info)
-      throws SCMException {
-    containers.updateContainerInfo(info);
-    return containers.getContainerInfo(info);
-  }
-
-
-  /**
-   * Return a container matching the attributes specified.
-   *
-   * @param size - Space needed in the Container.
-   * @param owner - Owner of the container - A specific nameservice.
-   * @param type - Replication Type {StandAlone, Ratis}
-   * @param factor - Replication Factor {ONE, THREE}
-   * @param state - State of the Container-- {Open, Allocated etc.}
-   * @return ContainerInfo, null if there is no match found.
-   */
-  public ContainerInfo getMatchingContainer(final long size,
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) {
-
-    // Find containers that match the query spec, if no match return null.
-    NavigableSet<ContainerID> matchingSet =
-        containers.getMatchingContainerIDs(state, owner, factor, type);
-    if (matchingSet == null || matchingSet.size() == 0) {
-      return null;
-    }
-
-    // Get the last used container and find container above the last used
-    // container ID.
-    ContainerState key = new ContainerState(owner, type, factor);
-    ContainerID lastID = lastUsedMap.get(key);
-    if(lastID == null) {
-      lastID = matchingSet.first();
-    }
-
-    // There is a small issue here. The first time, we will skip the first
-    // container. But in most cases it will not matter.
-    NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
-    if (resultSet.size() == 0) {
-      resultSet = matchingSet;
-    }
-
-    ContainerInfo selectedContainer =
-        findContainerWithSpace(size, resultSet, owner);
-    if (selectedContainer == null) {
-
-      // If we did not find any space in the tailSet, we need to look for
-      // space in the headset, we need to pass true to deal with the
-      // situation that we have a lone container that has space. That is we
-      // ignored the last used container under the assumption we can find
-      // other containers with space, but if have a single container that is
-      // not true. Hence we need to include the last used container as the
-      // last element in the sorted set.
-
-      resultSet = matchingSet.headSet(lastID, true);
-      selectedContainer = findContainerWithSpace(size, resultSet, owner);
-    }
-    // Update the allocated Bytes on this container.
-    if(selectedContainer != null) {
-      selectedContainer.updateAllocatedBytes(size);
-    }
-    return selectedContainer;
-
-  }
-
-  private ContainerInfo findContainerWithSpace(long size,
-      NavigableSet<ContainerID> searchSet, String owner) {
-    // Get the container with space to meet our request.
-    for (ContainerID id : searchSet) {
-      ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
-      if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
-        containerInfo.updateLastUsedTime();
-
-        ContainerState key = new ContainerState(owner,
-            containerInfo.getPipeline().getType(),
-            containerInfo.getPipeline().getFactor());
-        lastUsedMap.put(key, containerInfo.containerID());
-        return containerInfo;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Returns a set of ContainerIDs that match the Container.
-   *
-   * @param owner  Owner of the Containers.
-   * @param type - Replication Type of the containers
-   * @param factor - Replication factor of the containers.
-   * @param state - Current State, like Open, Close etc.
-   * @return Set of containers that match the specific query parameters.
-   */
-  public NavigableSet<ContainerID> getMatchingContainerIDs(
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) {
-    return containers.getMatchingContainerIDs(state, owner,
-        factor, type);
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
deleted file mode 100644
index 1372e7f..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-package org.apache.hadoop.ozone.scm.container.ContainerStates;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-
-/**
- * Each Attribute that we manage for a container is maintained as a map.
- * <p>
- * Currently we manage the following attributes for a container.
- * <p>
- * 1. StateMap - LifeCycleState -> Set of ContainerIDs
- * 2. TypeMap  - ReplicationType -> Set of ContainerIDs
- * 3. OwnerMap - OwnerNames -> Set of ContainerIDs
- * 4. FactorMap - ReplicationFactor -> Set of ContainerIDs
- * <p>
- * This means that for a cluster size of 750 PB -- we will have around 150
- * Million containers, if we assume 5GB average container size.
- * <p>
- * That implies that these maps will take around 2/3 GB of RAM which will be
- * pinned down in the SCM. This is deemed acceptable since we can tune the
- * container size --say we make it 10GB average size, then we can deal with a
- * cluster size of 1.5 exa bytes with the same metadata in SCMs memory.
- * <p>
- * Please note: **This class is not thread safe**. This used to be thread safe,
- * while bench marking we found that ContainerStateMap would be taking 5
- * locks for a single container insert. If we remove locks in this class,
- * then we are able to perform about 540K operations per second, with the
- * locks in this class it goes down to 246K operations per second. Hence we
- * are going to rely on ContainerStateMap locks to maintain consistency of
- * data in these classes too, since ContainerAttribute is only used by
- * ContainerStateMap class.
- */
-public class ContainerAttribute<T> {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerAttribute.class);
-
-  private final Map<T, NavigableSet<ContainerID>> attributeMap;
-  private static final NavigableSet<ContainerID> EMPTY_SET =  Collections
-      .unmodifiableNavigableSet(new TreeSet<>());
-
-  /**
-   * Creates a Container Attribute map from an existing Map.
-   *
-   * @param attributeMap - AttributeMap
-   */
-  public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
-    this.attributeMap = attributeMap;
-  }
-
-  /**
-   * Create an empty Container Attribute map.
-   */
-  public ContainerAttribute() {
-    this.attributeMap = new HashMap<>();
-  }
-
-  /**
-   * Insert or update the value in the Attribute map.
-   *
-   * @param key - The key to the set where the ContainerID should exist.
-   * @param value - Actual Container ID.
-   * @throws SCMException - on Error
-   */
-  public boolean insert(T key, ContainerID value) throws SCMException {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-
-    if (attributeMap.containsKey(key)) {
-      if (attributeMap.get(key).add(value)) {
-        return true; //we inserted the value as it doesn’t exist in the set.
-      } else { // Failure indicates that this ContainerID exists in the Set
-        if (!attributeMap.get(key).remove(value)) {
-          LOG.error("Failure to remove the object from the Map.Key:{}, " +
-              "ContainerID: {}", key, value);
-          throw new SCMException("Failure to remove the object from the Map",
-              FAILED_TO_CHANGE_CONTAINER_STATE);
-        }
-        attributeMap.get(key).add(value);
-        return true;
-      }
-    } else {
-      // This key does not exist, we need to allocate this key in the map.
-      // TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils.
-      // Skipping for now, since FoldedTreeSet does not have implementations
-      // for headSet and TailSet. We need those calls.
-      this.attributeMap.put(key, new TreeSet<>());
-      // This should not fail, we just allocated this object.
-      attributeMap.get(key).add(value);
-      return true;
-    }
-  }
-
-  /**
-   * Returns true if have this bucket in the attribute map.
-   *
-   * @param key - Key to lookup
-   * @return true if we have the key
-   */
-  public boolean hasKey(T key) {
-    Preconditions.checkNotNull(key);
-    return this.attributeMap.containsKey(key);
-  }
-
-  /**
-   * Returns true if we have the key and the containerID in the bucket.
-   *
-   * @param key - Key to the bucket
-   * @param id - container ID that we want to lookup
-   * @return true or false
-   */
-  public boolean hasContainerID(T key, ContainerID id) {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(id);
-
-    return this.attributeMap.containsKey(key) &&
-        this.attributeMap.get(key).contains(id);
-  }
-
-  /**
-   * Returns true if we have the key and the containerID in the bucket.
-   *
-   * @param key - Key to the bucket
-   * @param id - container ID that we want to lookup
-   * @return true or false
-   */
-  public boolean hasContainerID(T key, int id) {
-    return hasContainerID(key, ContainerID.valueof(id));
-  }
-
-  /**
-   * Clears all entries for this key type.
-   *
-   * @param key - Key that identifies the Set.
-   */
-  public void clearSet(T key) {
-    Preconditions.checkNotNull(key);
-
-    if (attributeMap.containsKey(key)) {
-      attributeMap.get(key).clear();
-    } else {
-      LOG.debug("key: {} does not exist in the attributeMap", key);
-    }
-  }
-
-  /**
-   * Removes a container ID from the set pointed by the key.
-   *
-   * @param key - key to identify the set.
-   * @param value - Container ID
-   */
-  public boolean remove(T key, ContainerID value) {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-
-    if (attributeMap.containsKey(key)) {
-      if (!attributeMap.get(key).remove(value)) {
-        LOG.debug("ContainerID: {} does not exist in the set pointed by " +
-            "key:{}", value, key);
-        return false;
-      }
-      return true;
-    } else {
-      LOG.debug("key: {} does not exist in the attributeMap", key);
-      return false;
-    }
-  }
-
-  /**
-   * Returns the collection that maps to the given key.
-   *
-   * @param key - Key to the bucket.
-   * @return Underlying Set in immutable form.
-   */
-  public NavigableSet<ContainerID> getCollection(T key) {
-    Preconditions.checkNotNull(key);
-
-    if (this.attributeMap.containsKey(key)) {
-      return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
-    }
-    LOG.debug("No such Key. Key {}", key);
-    return EMPTY_SET;
-  }
-
-  /**
-   * Moves a ContainerID from one bucket to another.
-   *
-   * @param currentKey - Current Key
-   * @param newKey - newKey
-   * @param value - ContainerID
-   * @throws SCMException on Error
-   */
-  public void update(T currentKey, T newKey, ContainerID value)
-      throws SCMException {
-    Preconditions.checkNotNull(currentKey);
-    Preconditions.checkNotNull(newKey);
-
-    boolean removed = false;
-    try {
-      removed = remove(currentKey, value);
-      if (!removed) {
-        throw new SCMException("Unable to find key in the current key bucket",
-            FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-      insert(newKey, value);
-    } catch (SCMException ex) {
-      // if we removed the key, insert it back to original bucket, since the
-      // next insert failed.
-      LOG.error("error in update.", ex);
-      if (removed) {
-        insert(currentKey, value);
-        LOG.trace("reinserted the removed key. {}", currentKey);
-      }
-      throw ex;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
deleted file mode 100644
index 6c492ff..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.ozone.scm.container.ContainerStates;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-
-/**
- * Class that acts as the container state.
- */
-public class ContainerState {
-  private final OzoneProtos.ReplicationType type;
-  private final String owner;
-  private final OzoneProtos.ReplicationFactor replicationFactor;
-
-  /**
-   * Constructs a Container Key.
-   *
-   * @param owner - Container Owners
-   * @param type - Replication Type.
-   * @param factor - Replication Factors
-   */
-  public ContainerState(String owner, OzoneProtos.ReplicationType type,
-      OzoneProtos.ReplicationFactor factor) {
-    this.type = type;
-    this.owner = owner;
-    this.replicationFactor = factor;
-  }
-
-
-  public OzoneProtos.ReplicationType getType() {
-    return type;
-  }
-
-  public String getOwner() {
-    return owner;
-  }
-
-  public OzoneProtos.ReplicationFactor getFactor() {
-    return replicationFactor;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    ContainerState that = (ContainerState) o;
-
-    return new EqualsBuilder()
-        .append(type, that.type)
-        .append(owner, that.owner)
-        .append(replicationFactor, that.replicationFactor)
-        .isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(137, 757)
-        .append(type)
-        .append(owner)
-        .append(replicationFactor)
-        .toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "ContainerKey{" +
-        ", type=" + type +
-        ", owner=" + owner +
-        ", replicationFactor=" + replicationFactor +
-        '}';
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
deleted file mode 100644
index eebc6be..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.ozone.scm.container.ContainerStates;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.util.AutoCloseableLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .CONTAINER_EXISTS;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_FIND_CONTAINER;
-
-/**
- * Container State Map acts like a unified map for various attributes that are
- * used to select containers when we need allocated blocks.
- * <p>
- * This class provides the ability to query 4 classes of attributes. They are
- * <p>
- * 1. LifeCycleStates - LifeCycle States of container describe in which state
- * a container is. For example, a container needs to be in Open State for a
- * client to able to write to it.
- * <p>
- * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or
- * Key Space Manager (KSM) of Ozone or CBlockServer --  is an owner. It is
- * possible to have many KSMs for a Ozone cluster and only one SCM. But SCM
- * keeps the data from each KSM in separate bucket, never mixing them. To
- * write data, often we have to find all open containers for a specific owner.
- * <p>
- * 3. ReplicationType - The clients are allowed to specify what kind of
- * replication pipeline they want to use. Each Container exists on top of a
- * pipeline, so we need to get ReplicationType that is specified by the user.
- * <p>
- * 4. ReplicationFactor - The replication factor represents how many copies
- * of data should be made, right now we support 2 different types, ONE
- * Replica and THREE Replica. User can specify how many copies should be made
- * for a ozone key.
- * <p>
- * The most common access pattern of this class is to select a container based
- * on all these parameters, for example, when allocating a block we will
- * select a container that belongs to user1, with Ratis replication which can
- * make 3 copies of data. The fact that we will look for open containers by
- * default and if we cannot find them we will add new containers.
- */
-public class ContainerStateMap {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStateMap.class);
-
-  private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
-  private final ContainerAttribute<String> ownerMap;
-  private final ContainerAttribute<ReplicationFactor> factorMap;
-  private final ContainerAttribute<ReplicationType> typeMap;
-
-  private final Map<ContainerID, ContainerInfo> containerMap;
-  private final static NavigableSet<ContainerID> EMPTY_SET  =
-      Collections.unmodifiableNavigableSet(new TreeSet<>());
-
-  // Container State Map lock should be held before calling into
-  // Update ContainerAttributes. The consistency of ContainerAttributes is
-  // protected by this lock.
-  private final AutoCloseableLock autoLock;
-
-  /**
-   * Create a ContainerStateMap.
-   */
-  public ContainerStateMap() {
-    lifeCycleStateMap = new ContainerAttribute<>();
-    ownerMap = new ContainerAttribute<>();
-    factorMap = new ContainerAttribute<>();
-    typeMap = new ContainerAttribute<>();
-    containerMap = new HashMap<>();
-    autoLock = new AutoCloseableLock();
-//        new InstrumentedLock(getClass().getName(), LOG,
-//            new ReentrantLock(),
-//            1000,
-//            300));
-  }
-
-  /**
-   * Adds a ContainerInfo Entry in the ContainerStateMap.
-   *
-   * @param info - container info
-   * @throws SCMException - throws if create failed.
-   */
-  public void addContainer(ContainerInfo info)
-      throws SCMException {
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      ContainerID id = ContainerID.valueof(info.getContainerID());
-      if (containerMap.putIfAbsent(id, info) != null) {
-        LOG.debug("Duplicate container ID detected. {}", id);
-        throw new
-            SCMException("Duplicate container ID detected.",
-            CONTAINER_EXISTS);
-      }
-
-      lifeCycleStateMap.insert(info.getState(), id);
-      ownerMap.insert(info.getOwner(), id);
-      factorMap.insert(info.getPipeline().getFactor(), id);
-      typeMap.insert(info.getPipeline().getType(), id);
-      LOG.trace("Created container with {} successfully.", id);
-    }
-  }
-
-  /**
-   * Returns the latest state of Container from SCM's Container State Map.
-   *
-   * @param info - ContainerInfo
-   * @return ContainerInfo
-   */
-  public ContainerInfo getContainerInfo(ContainerInfo info) {
-    return getContainerInfo(info.getContainerID());
-  }
-
-  /**
-   * Returns the latest state of Container from SCM's Container State Map.
-   *
-   * @param containerID - int
-   * @return container info, if found.
-   */
-  public ContainerInfo getContainerInfo(long containerID) {
-    ContainerID id = new ContainerID(containerID);
-    return containerMap.get(id);
-  }
-
-  /**
-   * Returns the full container Map.
-   *
-   * @return - Map
-   */
-  public Map<ContainerID, ContainerInfo> getContainerMap() {
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      return Collections.unmodifiableMap(containerMap);
-    }
-  }
-
-  /**
-   * Just update the container State.
-   * @param info ContainerInfo.
-   */
-  public void updateContainerInfo(ContainerInfo info) throws SCMException {
-    Preconditions.checkNotNull(info);
-    ContainerInfo currentInfo = null;
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      currentInfo = containerMap.get(
-          ContainerID.valueof(info.getContainerID()));
-
-      if (currentInfo == null) {
-        throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
-      }
-      containerMap.put(info.containerID(), info);
-    }
-  }
-
-  /**
-   * Update the State of a container.
-   *
-   * @param info - ContainerInfo
-   * @param currentState - CurrentState
-   * @param newState - NewState.
-   * @throws SCMException - in case of failure.
-   */
-  public void updateState(ContainerInfo info, LifeCycleState currentState,
-      LifeCycleState newState) throws SCMException {
-    Preconditions.checkNotNull(currentState);
-    Preconditions.checkNotNull(newState);
-
-    ContainerID id = new ContainerID(info.getContainerID());
-    ContainerInfo currentInfo = null;
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      currentInfo = containerMap.get(id);
-
-      if (currentInfo == null) {
-        throw new
-            SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
-      }
-      // We are updating two places before this update is done, these can
-      // fail independently, since the code needs to handle it.
-
-      // We update the attribute map, if that fails it will throw an exception,
-      // so no issues, if we are successful, we keep track of the fact that we
-      // have updated the lifecycle state in the map, and update the container
-      // state. If this second update fails, we will attempt to roll back the
-      // earlier change we did. If the rollback fails, we can be in an
-      // inconsistent state,
-
-      info.setState(newState);
-      containerMap.put(id, info);
-      lifeCycleStateMap.update(currentState, newState, id);
-      LOG.trace("Updated the container {} to new state. Old = {}, new = " +
-          "{}", id, currentState, newState);
-    } catch (SCMException ex) {
-      LOG.error("Unable to update the container state. {}", ex);
-      // we need to revert the change in this attribute since we are not
-      // able to update the hash table.
-      LOG.info("Reverting the update to lifecycle state. Moving back to " +
-              "old state. Old = {}, Attempted state = {}", currentState,
-          newState);
-
-      containerMap.put(id, currentInfo);
-
-      // if this line throws, the state map can be in an inconsistent
-      // state, since we will have modified the attribute by the
-      // container state will not in sync since we were not able to put
-      // that into the hash table.
-      lifeCycleStateMap.update(newState, currentState, id);
-
-      throw new SCMException("Updating the container map failed.", ex,
-          FAILED_TO_CHANGE_CONTAINER_STATE);
-    }
-  }
-
-  /**
-   * Returns A list of containers owned by a name service.
-   *
-   * @param ownerName - Name of the NameService.
-   * @return - NavigableSet of ContainerIDs.
-   */
-  NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
-    Preconditions.checkNotNull(ownerName);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      return ownerMap.getCollection(ownerName);
-    }
-  }
-
-  /**
-   * Returns Containers in the System by the Type.
-   *
-   * @param type - Replication type -- StandAlone, Ratis etc.
-   * @return NavigableSet
-   */
-  NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
-    Preconditions.checkNotNull(type);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      return typeMap.getCollection(type);
-    }
-  }
-
-  /**
-   * Returns Containers by replication factor.
-   *
-   * @param factor - Replication Factor.
-   * @return NavigableSet.
-   */
-  NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
-    Preconditions.checkNotNull(factor);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      return factorMap.getCollection(factor);
-    }
-  }
-
-  /**
-   * Returns Containers by State.
-   *
-   * @param state - State - Open, Closed etc.
-   * @return List of containers by state.
-   */
-  NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
-    Preconditions.checkNotNull(state);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      return lifeCycleStateMap.getCollection(state);
-    }
-  }
-
-  /**
-   * Gets the containers that matches the  following filters.
-   *
-   * @param state - LifeCycleState
-   * @param owner - Owner
-   * @param factor - Replication Factor
-   * @param type - Replication Type
-   * @return ContainerInfo or Null if not container satisfies the criteria.
-   */
-  public NavigableSet<ContainerID> getMatchingContainerIDs(
-      LifeCycleState state, String owner,
-      ReplicationFactor factor, ReplicationType type) {
-
-    Preconditions.checkNotNull(state, "State cannot be null");
-    Preconditions.checkNotNull(owner, "Owner cannot be null");
-    Preconditions.checkNotNull(factor, "Factor cannot be null");
-    Preconditions.checkNotNull(type, "Type cannot be null");
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-
-      // If we cannot meet any one condition we return EMPTY_SET immediately.
-      // Since when we intersect these sets, the result will be empty if any
-      // one is empty.
-      NavigableSet<ContainerID> stateSet =
-          lifeCycleStateMap.getCollection(state);
-      if (stateSet.size() == 0) {
-        return EMPTY_SET;
-      }
-
-      NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
-      if (ownerSet.size() == 0) {
-        return EMPTY_SET;
-      }
-
-      NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
-      if (factorSet.size() == 0) {
-        return EMPTY_SET;
-      }
-
-      NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
-      if (typeSet.size() == 0) {
-        return EMPTY_SET;
-      }
-
-
-      // if we add more constraints we will just add those sets here..
-      NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
-          ownerSet, factorSet, typeSet);
-
-      NavigableSet<ContainerID> currentSet = sets[0];
-      // We take the smallest set and intersect against the larger sets. This
-      // allows us to reduce the lookups to the least possible number.
-      for (int x = 1; x < sets.length; x++) {
-        currentSet = intersectSets(currentSet, sets[x]);
-      }
-      return currentSet;
-    }
-  }
-
-  /**
-   * Calculates the intersection between sets and returns a new set.
-   *
-   * @param smaller - First Set
-   * @param bigger - Second Set
-   * @return resultSet which is the intersection of these two sets.
-   */
-  private NavigableSet<ContainerID> intersectSets(
-      NavigableSet<ContainerID> smaller,
-      NavigableSet<ContainerID> bigger) {
-    Preconditions.checkState(smaller.size() <= bigger.size(),
-        "This function assumes the first set is lesser or equal to second " +
-            "set");
-    NavigableSet<ContainerID> resultSet = new TreeSet<>();
-    for (ContainerID id : smaller) {
-      if (bigger.contains(id)) {
-        resultSet.add(id);
-      }
-    }
-    return resultSet;
-  }
-
-  /**
-   * Sorts a list of Sets based on Size. This is useful when we are
-   * intersecting the sets.
-   *
-   * @param sets - varagrs of sets
-   * @return Returns a sorted array of sets based on the size of the set.
-   */
-  @SuppressWarnings("unchecked")
-  private NavigableSet<ContainerID>[] sortBySize(
-      NavigableSet<ContainerID>... sets) {
-    for (int x = 0; x < sets.length - 1; x++) {
-      for (int y = 0; y < sets.length - x - 1; y++) {
-        if (sets[y].size() > sets[y + 1].size()) {
-          NavigableSet temp = sets[y];
-          sets[y] = sets[y + 1];
-          sets[y + 1] = temp;
-        }
-      }
-    }
-    return sets;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
deleted file mode 100644
index 6a7e663..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-/**
- * Container States management package.
- */
-package org.apache.hadoop.ozone.scm.container.ContainerStates;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
deleted file mode 100644
index 0d442d1..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.scm.container;
-
-
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This 
is
- * used by SCM when allocating new locations and when looking up a key.
- */
-public interface Mapping extends Closeable {
-  /**
-   * Returns the ContainerInfo from the container name.
-   *
-   * @param containerName - Name
-   * @return - ContainerInfo such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerInfo getContainer(String containerName) throws IOException;
-
-  /**
-   * Returns containers under certain conditions.
-   * Search container names from start name(exclusive),
-   * and use prefix name to filter the result. The max
-   * size of the searching range cannot exceed the
-   * value of count.
-   *
-   * @param startName start name, if null, start searching at the head.
-   * @param prefixName prefix name, if null, then filter is disabled.
-   * @param count count, if count < 0, the max size is unlimited.(
-   *              Usually the count will be replace with a very big
-   *              value instead of being unlimited in case the db is very big)
-   *
-   * @return a list of container.
-   * @throws IOException
-   */
-  List<ContainerInfo> listContainer(String startName, String prefixName,
-      int count) throws IOException;
-
-  /**
-   * Allocates a new container for a given keyName and replication factor.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param containerName - Name.
-   * @param owner
-   * @return - Container Info.
-   * @throws IOException
-   */
-  ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
-      OzoneProtos.ReplicationFactor replicationFactor,
-      String containerName, String owner) throws IOException;
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerName - Container Name
-   * @throws IOException
-   */
-  void deleteContainer(String containerName) throws IOException;
-
-  /**
-   * Update container state.
-   * @param containerName - Container Name
-   * @param event - container life cycle event
-   * @return - new container state
-   * @throws IOException
-   */
-  OzoneProtos.LifeCycleState updateContainerState(String containerName,
-      OzoneProtos.LifeCycleEvent event) throws IOException;
-
-  /**
-   * Returns the container State Manager.
-   * @return ContainerStateManager
-   */
-  ContainerStateManager getStateManager();
-
-  /**
-   * Process container report from Datanode.
-   *
-   * @param reports Container report
-   */
-  void processContainerReports(ContainerReportsRequestProto reports)
-      throws IOException;
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to