http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 4974268..70a0e8a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -83,11 +82,10 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
   public ContainerResponseProto allocateContainer(RpcController unused,
       ContainerRequestProto request) throws ServiceException {
     try {
-      Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
-          request.getReplicationFactor(), request.getContainerName(),
-          request.getOwner());
+      ContainerInfo container = 
impl.allocateContainer(request.getReplicationType(),
+          request.getReplicationFactor(), request.getOwner());
       return ContainerResponseProto.newBuilder()
-          .setPipeline(pipeline.getProtobufMessage())
+          .setContainerInfo(container.getProtobuf())
           .setErrorCode(ContainerResponseProto.Error.success)
           .build();
 
@@ -101,9 +99,9 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
       RpcController controller, GetContainerRequestProto request)
       throws ServiceException {
     try {
-      Pipeline pipeline = impl.getContainer(request.getContainerName());
+      ContainerInfo container = impl.getContainer(request.getContainerID());
       return GetContainerResponseProto.newBuilder()
-          .setPipeline(pipeline.getProtobufMessage())
+          .setContainerInfo(container.getProtobuf())
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -114,23 +112,17 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
   public SCMListContainerResponseProto listContainer(RpcController controller,
       SCMListContainerRequestProto request) throws ServiceException {
     try {
-      String startName = null;
-      String prefixName = null;
+      long startContainerID = 0;
       int count = -1;
 
       // Arguments check.
-      if (request.hasPrefixName()) {
+      if (request.hasStartContainerID()) {
         // End container name is given.
-        prefixName = request.getPrefixName();
+        startContainerID = request.getStartContainerID();
       }
-      if (request.hasStartName()) {
-        // End container name is given.
-        startName = request.getStartName();
-      }
-
       count = request.getCount();
       List<ContainerInfo> containerList =
-          impl.listContainer(startName, prefixName, count);
+          impl.listContainer(startContainerID, count);
       SCMListContainerResponseProto.Builder builder =
           SCMListContainerResponseProto.newBuilder();
       for (ContainerInfo container : containerList) {
@@ -147,7 +139,7 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
       RpcController controller, SCMDeleteContainerRequestProto request)
       throws ServiceException {
     try {
-      impl.deleteContainer(request.getContainerName());
+      impl.deleteContainer(request.getContainerID());
       return SCMDeleteContainerResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -178,7 +170,7 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
       RpcController controller, ObjectStageChangeRequestProto request)
       throws ServiceException {
     try {
-      impl.notifyObjectStageChange(request.getType(), request.getName(),
+      impl.notifyObjectStageChange(request.getType(), request.getId(),
           request.getOp(), request.getStage());
       return ObjectStageChangeResponseProto.newBuilder().build();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
index 83ca83d..13b9180 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -76,7 +76,9 @@ public class LevelDBStore implements MetadataStore {
   }
 
   private void openDB(File dbPath, Options options) throws IOException {
-    dbPath.getParentFile().mkdirs();
+    if (dbPath.getParentFile().mkdirs()) {
+      LOG.debug("Db path {} created.", dbPath.getParentFile());
+    }
     db = JniDBFactory.factory.open(dbPath, options);
     if (LOG.isDebugEnabled()) {
       LOG.debug("LevelDB successfully opened");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
index 3ff0a94..d3a2943 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.utils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 /**
@@ -94,8 +94,8 @@ public final class MetadataKeyFilters {
       if (Strings.isNullOrEmpty(keyPrefix)) {
         accept = true;
       } else {
-        if (currentKey != null &&
-            DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) {
+        byte [] prefixBytes = keyPrefix.getBytes();
+        if (currentKey != null && prefixMatch(prefixBytes, currentKey)) {
           keysHinted++;
           accept = true;
         } else {
@@ -114,5 +114,19 @@ public final class MetadataKeyFilters {
     public int getKeysHintedNum() {
       return keysHinted;
     }
+
+    private boolean prefixMatch(byte[] prefix, byte[] key) {
+      Preconditions.checkNotNull(prefix);
+      Preconditions.checkNotNull(key);
+      if (key.length < prefix.length) {
+        return false;
+      }
+      for (int i = 0; i < prefix.length; i++) {
+        if (key[i] != prefix[i]) {
+          return false;
+        }
+      }
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
index a60e98d..0dfca20 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -367,6 +367,7 @@ public class RocksDBStore implements MetadataStore {
   public void close() throws IOException {
     if (statMBeanName != null) {
       MBeans.unregister(statMBeanName);
+      statMBeanName = null;
     }
     if (db != null) {
       db.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index a6270ef..e7494ee 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -197,17 +197,15 @@ message ContainerCommandResponseProto {
 }
 
 message ContainerData {
-  required string name = 1;
+  required int64 containerID = 1;
   repeated KeyValue metadata = 2;
   optional string dbPath = 3;
   optional string containerPath = 4;
-  optional string hash = 6;
-  optional int64 bytesUsed = 7;
-  optional int64 size = 8;
-  optional int64 keyCount = 9;
-  //TODO: change required after we switch container ID from string to long
-  optional int64 containerID = 10;
-  optional LifeCycleState state = 11 [default = OPEN];
+  optional string hash = 5;
+  optional int64 bytesUsed = 6;
+  optional int64 size = 7;
+  optional int64 keyCount = 8;
+  optional LifeCycleState state = 9 [default = OPEN];
 }
 
 message ContainerMeta {
@@ -226,7 +224,7 @@ message  CreateContainerResponseProto {
 
 message  ReadContainerRequestProto {
   required Pipeline pipeline = 1;
-  required string name = 2;
+  required int64 containerID = 2;
 }
 
 message  ReadContainerResponseProto {
@@ -243,19 +241,16 @@ message  UpdateContainerResponseProto {
 }
 
 message  DeleteContainerRequestProto {
-  required Pipeline pipeline = 1;
-  required string name = 2;
-  optional bool forceDelete = 3 [default = false];
+  required int64 containerID = 1;
+  optional bool forceDelete = 2 [default = false];
 }
 
 message  DeleteContainerResponseProto {
 }
 
 message  ListContainerRequestProto {
-  required Pipeline pipeline = 1;
-  optional string prefix = 2;
-  required uint32 count = 3; // Max Results to return
-  optional string prevKey = 4;  // if this is not set query from start.
+  required int64 startContainerID = 1;
+  optional uint32 count = 2; // Max Results to return
 }
 
 message  ListContainerResponseProto {
@@ -263,34 +258,31 @@ message  ListContainerResponseProto {
 }
 
 message CloseContainerRequestProto {
-  required Pipeline pipeline = 1;
+  required int64 containerID = 1;
 }
 
 message CloseContainerResponseProto {
-  optional Pipeline pipeline = 1;
   optional string hash = 2;
+  optional int64 containerID = 3;
 }
 
 message KeyData {
-  required string containerName = 1;
-  required string name = 2;
-  optional int64 flags = 3; // for future use.
-  repeated KeyValue metadata = 4;
-  repeated ChunkInfo chunks = 5;
+  required BlockID blockID = 1;
+  optional int64 flags = 2; // for future use.
+  repeated KeyValue metadata = 3;
+  repeated ChunkInfo chunks = 4;
 }
 
 // Key Messages.
 message  PutKeyRequestProto {
-  required Pipeline pipeline = 1;
-  required KeyData keyData = 2;
+  required KeyData keyData = 1;
 }
 
 message  PutKeyResponseProto {
 }
 
 message  GetKeyRequestProto  {
-  required Pipeline pipeline = 1;
-  required KeyData keyData = 2;
+  required KeyData keyData = 1;
 }
 
 message  GetKeyResponseProto  {
@@ -299,17 +291,15 @@ message  GetKeyResponseProto  {
 
 
 message  DeleteKeyRequestProto {
-  required Pipeline pipeline = 1;
-  required string name = 2;
+  required BlockID blockID = 1;
 }
 
 message   DeleteKeyResponseProto {
 }
 
 message  ListKeyRequestProto {
-  required Pipeline pipeline = 1;
-  optional string prefix = 2; // if specified returns keys that match prefix.
-  required string prevKey = 3;
+  required int64 containerID = 1;
+  optional int64 startLocalID = 2;
   required uint32 count = 4;
 
 }
@@ -335,31 +325,28 @@ enum Stage {
 }
 
 message  WriteChunkRequestProto  {
-  required Pipeline pipeline = 1;
-  required string keyName = 2;
-  required ChunkInfo chunkData = 3;
-  optional bytes data = 4;
-  optional Stage stage = 5 [default = COMBINED];
+  required BlockID blockID = 1;
+  required ChunkInfo chunkData = 2;
+  optional bytes data = 3;
+  optional Stage stage = 4 [default = COMBINED];
 }
 
 message  WriteChunkResponseProto {
 }
 
 message  ReadChunkRequestProto  {
-  required Pipeline pipeline = 1;
-  required string keyName = 2;
-  required ChunkInfo chunkData = 3;
+  required BlockID blockID = 1;
+  required ChunkInfo chunkData = 2;
 }
 
 message  ReadChunkResponseProto {
-  required Pipeline pipeline = 1;
+  required BlockID blockID = 1;
   required ChunkInfo chunkData = 2;
   required bytes data = 3;
 }
 
 message  DeleteChunkRequestProto {
-  required Pipeline pipeline = 1;
-  required string keyName = 2;
+  required BlockID blockID = 1;
   required ChunkInfo chunkData = 3;
 }
 
@@ -367,10 +354,9 @@ message  DeleteChunkResponseProto {
 }
 
 message  ListChunkRequestProto {
-  required Pipeline pipeline = 1;
-  required string keyName = 2;
-  required string prevChunkName = 3;
-  required uint32 count = 4;
+  required BlockID blockID = 1;
+  required string prevChunkName = 2;
+  required uint32 count = 3;
 }
 
 message  ListChunkResponseProto {
@@ -400,7 +386,7 @@ message GetSmallFileResponseProto {
 }
 
 message CopyContainerRequestProto {
-  required string containerName = 1;
+  required int64 containerID = 1;
   required uint64 readOffset = 2;
   optional uint64 len = 3;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto 
b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
index 38d2e16..7bea82a 100644
--- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
@@ -33,28 +33,6 @@ import "hdds.proto";
 
 
 // SCM Block protocol
-/**
- * keys - batch of block keys to find
- */
-message GetScmBlockLocationsRequestProto {
-  repeated string keys = 1;
-}
-
-/**
- * locatedBlocks - for each requested hash, nodes that currently host the
- *     container for that object key hash
- */
-message GetScmBlockLocationsResponseProto {
-  repeated ScmLocatedBlockProto locatedBlocks = 1;
-}
-
-/**
- * Holds the nodes that currently host the blocks for a key.
- */
-message ScmLocatedBlockProto {
-  required string key = 1;
-  required hadoop.hdds.Pipeline pipeline = 2;
-}
 
 /**
 * Request send to SCM asking allocate block of specified size.
@@ -84,7 +62,7 @@ message DeleteScmKeyBlocksRequestProto {
  */
 message KeyBlocks {
   required string key = 1;
-  repeated string blocks = 2;
+  repeated BlockID blocks = 2;
 }
 
 /**
@@ -112,7 +90,7 @@ message DeleteScmBlockResult {
     unknownFailure = 4;
   }
   required Result result = 1;
-  required string key = 2;
+  required BlockID blockID = 2;
 }
 
 /**
@@ -126,7 +104,7 @@ message AllocateScmBlockResponseProto {
     unknownFailure = 4;
   }
   required Error errorCode = 1;
-  required string key = 2;
+  required BlockID blockID = 2;
   required hadoop.hdds.Pipeline pipeline = 3;
   required bool createContainer = 4;
   optional string errorMessage = 5;
@@ -139,14 +117,6 @@ message AllocateScmBlockResponseProto {
 service ScmBlockLocationProtocolService {
 
   /**
-   * Find the set of nodes that currently host the block, as
-   * identified by the key.  This method supports batch lookup by
-   * passing multiple keys.
-   */
-  rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
-      returns (GetScmBlockLocationsResponseProto);
-
-  /**
    * Creates a block entry in SCM.
    */
   rpc allocateScmBlock(AllocateScmBlockRequestProto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto 
b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index d7540a3..090e6eb 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -35,7 +35,6 @@ import "hdds.proto";
 * Request send to SCM asking where the container should be created.
 */
 message ContainerRequestProto {
-  required string containerName = 1;
   // Ozone only support replciation of either 1 or 3.
   required ReplicationFactor replicationFactor = 2;
   required ReplicationType  replicationType = 3;
@@ -53,30 +52,29 @@ message ContainerResponseProto {
     errorContainerMissing = 3;
   }
   required Error errorCode = 1;
-  required Pipeline pipeline = 2;
+  required SCMContainerInfo containerInfo = 2;
   optional string errorMessage = 3;
 }
 
 message GetContainerRequestProto {
-  required string containerName = 1;
+  required int64 containerID = 1;
 }
 
 message GetContainerResponseProto {
-  required Pipeline pipeline = 1;
+  required SCMContainerInfo containerInfo = 1;
 }
 
 message SCMListContainerRequestProto {
   required uint32 count = 1;
-  optional string startName = 2;
-  optional string prefixName = 3;
-}
+  optional uint64 startContainerID = 2;
+ }
 
 message SCMListContainerResponseProto {
   repeated SCMContainerInfo containers = 1;
 }
 
 message SCMDeleteContainerRequestProto {
-  required string containerName = 1;
+  required int64 containerID = 1;
 }
 
 message SCMDeleteContainerResponseProto {
@@ -97,7 +95,7 @@ message ObjectStageChangeRequestProto {
     begin = 1;
     complete = 2;
   }
-  required string name = 1;
+  required int64 id = 1;
   required Type type = 2;
   required Op op= 3;
   required Stage stage = 4;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
index 0b650b4..6ea5727 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -50,7 +50,6 @@ message PipelineChannel {
 // A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
 // container.
 message Pipeline {
-    required string containerName = 1;
     required PipelineChannel pipelineChannel = 2;
 }
 
@@ -135,8 +134,7 @@ enum LifeCycleEvent {
 }
 
 message SCMContainerInfo {
-    // TODO : Remove the container name from pipeline.
-    required string containerName = 1;
+    required int64 containerID = 1;
     required LifeCycleState state = 2;
     required Pipeline pipeline = 3;
     // This is not total size of container, but space allocated by SCM for
@@ -146,7 +144,6 @@ message SCMContainerInfo {
     required uint64 numberOfKeys = 6;
     optional int64 stateEnterTime = 7;
     required string owner = 8;
-    required int64 containerID = 9;
 }
 
 message GetScmInfoRequestProto {
@@ -168,3 +165,11 @@ enum ReplicationFactor {
     ONE = 1;
     THREE = 3;
 }
+
+/**
+ * Block ID that uniquely identify a block by SCM.
+ */
+message BlockID {
+    required int64 containerID = 1;
+    required int64 localID = 2;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
index 68bf442..8c5609d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
@@ -105,18 +104,17 @@ public final class ChunkUtils {
    * Validates chunk data and returns a file object to Chunk File that we are
    * expected to write data to.
    *
-   * @param pipeline - pipeline.
    * @param data - container data.
    * @param info - chunk info.
    * @return File
    * @throws StorageContainerException
    */
-  public static File validateChunk(Pipeline pipeline, ContainerData data,
+  public static File validateChunk(ContainerData data,
       ChunkInfo info) throws StorageContainerException {
 
     Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
 
-    File chunkFile = getChunkFile(pipeline, data, info);
+    File chunkFile = getChunkFile(data, info);
     if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
       if (!ChunkUtils.isOverWritePermitted(info)) {
         log.error("Rejecting write chunk request. Chunk overwrite " +
@@ -132,21 +130,21 @@ public final class ChunkUtils {
   /**
    * Validates that Path to chunk file exists.
    *
-   * @param pipeline - Container Info.
    * @param data - Container Data
    * @param info - Chunk info
    * @return - File.
    * @throws StorageContainerException
    */
-  public static File getChunkFile(Pipeline pipeline, ContainerData data,
+  public static File getChunkFile(ContainerData data,
       ChunkInfo info) throws StorageContainerException {
 
+    Preconditions.checkNotNull(data, "Container data can't be null");
     Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data == null) {
-      log.error("Invalid container Name: {}", pipeline.getContainerName());
-      throw new StorageContainerException("Unable to find the container Name:" 
+
+    if (data.getContainerID() < 0) {
+      log.error("Invalid container id: {}", data.getContainerID());
+      throw new StorageContainerException("Unable to find the container id:" +
           " " +
-          pipeline.getContainerName(), CONTAINER_NOT_FOUND);
+          data.getContainerID(), CONTAINER_NOT_FOUND);
     }
 
     File dataDir = ContainerUtils.getDataDirectory(data).toFile();
@@ -335,7 +333,7 @@ public final class ChunkUtils {
         ContainerProtos.ReadChunkResponseProto.newBuilder();
     response.setChunkData(info.getProtoBufMessage());
     response.setData(ByteString.copyFrom(data));
-    response.setPipeline(msg.getReadChunk().getPipeline());
+    response.setBlockID(msg.getReadChunk().getBlockID());
 
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
         ContainerUtils.getContainerResponse(msg, ContainerProtos.Result

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
index c29374c..c20282a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
@@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class ContainerData {
 
-  private final String containerName;
   private final Map<String, String> metadata;
   private String dbPath;  // Path to Level DB Store.
   // Path to Physical file system where container and checksum are stored.
@@ -48,18 +47,18 @@ public class ContainerData {
   private String hash;
   private AtomicLong bytesUsed;
   private long maxSize;
-  private Long containerID;
+  private long containerID;
   private HddsProtos.LifeCycleState state;
 
   /**
    * Constructs a  ContainerData Object.
    *
-   * @param containerName - Name
+   * @param containerID - ID
+   * @param conf - Configuration
    */
-  public ContainerData(String containerName, Long containerID,
+  public ContainerData(long containerID,
       Configuration conf) {
     this.metadata = new TreeMap<>();
-    this.containerName = containerName;
     this.maxSize = 
conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
         ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
     this.bytesUsed =  new AtomicLong(0L);
@@ -76,7 +75,7 @@ public class ContainerData {
   public static ContainerData getFromProtBuf(
       ContainerProtos.ContainerData protoData, Configuration conf)
       throws IOException {
-    ContainerData data = new ContainerData(protoData.getName(),
+    ContainerData data = new ContainerData(
         protoData.getContainerID(), conf);
     for (int x = 0; x < protoData.getMetadataCount(); x++) {
       data.addMetadata(protoData.getMetadata(x).getKey(),
@@ -117,7 +116,6 @@ public class ContainerData {
   public ContainerProtos.ContainerData getProtoBufMessage() {
     ContainerProtos.ContainerData.Builder builder = ContainerProtos
         .ContainerData.newBuilder();
-    builder.setName(this.getContainerName());
     builder.setContainerID(this.getContainerID());
 
     if (this.getDBPath() != null) {
@@ -157,15 +155,6 @@ public class ContainerData {
   }
 
   /**
-   * Returns the name of the container.
-   *
-   * @return - name
-   */
-  public String getContainerName() {
-    return containerName;
-  }
-
-  /**
    * Adds metadata.
    */
   public void addMetadata(String key, String value) throws IOException {
@@ -231,9 +220,11 @@ public class ContainerData {
    *
    * @return String Name.
    */
-  public String getName() {
-    return getContainerName();
-  }
+    // TODO: check the ContainerCache class to see if we are using the 
ContainerID instead.
+   /*
+   public String getName() {
+    return getContainerID();
+  }*/
 
   /**
    * Get container file path.
@@ -255,7 +246,7 @@ public class ContainerData {
    * Get container ID.
    * @return - container ID.
    */
-  public synchronized Long getContainerID() {
+  public synchronized long getContainerID() {
     return containerID;
   }
 
@@ -284,7 +275,7 @@ public class ContainerData {
 
     // Some thing brain dead for now. name + Time stamp of when we get the 
close
     // container message.
-    setHash(DigestUtils.sha256Hex(this.getContainerName() +
+    setHash(DigestUtils.sha256Hex(this.getContainerID() +
         Long.toString(Time.monotonicNow())));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
index 50d2da3..19634f4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
@@ -26,7 +26,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
  */
 public class ContainerReport {
   private static final int UNKNOWN = -1;
-  private final String containerName;
   private final String finalhash;
   private long size;
   private long keyCount;
@@ -51,11 +50,11 @@ public class ContainerReport {
   /**
    * Constructs the ContainerReport.
    *
-   * @param containerName - Container Name.
+   * @param containerID - Container ID.
    * @param finalhash - Final Hash.
    */
-  public ContainerReport(String containerName, String finalhash) {
-    this.containerName = containerName;
+  public ContainerReport(long containerID, String finalhash) {
+    this.containerID = containerID;
     this.finalhash = finalhash;
     this.size = UNKNOWN;
     this.keyCount = UNKNOWN;
@@ -74,7 +73,7 @@ public class ContainerReport {
    */
   public static ContainerReport getFromProtoBuf(ContainerInfo info) {
     Preconditions.checkNotNull(info);
-    ContainerReport report = new ContainerReport(info.getContainerName(),
+    ContainerReport report = new ContainerReport(info.getContainerID(),
         info.getFinalhash());
     if (info.hasSize()) {
       report.setSize(info.getSize());
@@ -103,15 +102,6 @@ public class ContainerReport {
   }
 
   /**
-   * Gets the container name.
-   *
-   * @return - Name
-   */
-  public String getContainerName() {
-    return containerName;
-  }
-
-  /**
    * Returns the final signature for this container.
    *
    * @return - hash
@@ -203,7 +193,6 @@ public class ContainerReport {
    */
   public ContainerInfo getProtoBufMessage() {
     return ContainerInfo.newBuilder()
-        .setContainerName(this.getContainerName())
         .setKeyCount(this.getKeyCount())
         .setSize(this.getSize())
         .setUsed(this.getBytesUsed())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 1818188..e244354 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -184,6 +184,12 @@ public final class ContainerUtils {
         removeExtension(containerFile.getName())).toString();
   }
 
+  public static long getContainerIDFromFile(File containerFile) {
+    Preconditions.checkNotNull(containerFile);
+    String containerID = getContainerNameFromFile(containerFile);
+    return Long.parseLong(containerID);
+  }
+
   /**
    * Verifies that this in indeed a new container.
    *
@@ -289,8 +295,8 @@ public final class ContainerUtils {
    */
   public static File getMetadataFile(ContainerData containerData,
       Path location) {
-    return location.resolve(containerData
-        .getContainerName().concat(CONTAINER_META))
+    return location.resolve(Long.toString(containerData
+        .getContainerID()).concat(CONTAINER_META))
         .toFile();
   }
 
@@ -303,8 +309,8 @@ public final class ContainerUtils {
    */
   public static File getContainerFile(ContainerData containerData,
       Path location) {
-    return location.resolve(containerData
-        .getContainerName().concat(CONTAINER_EXTENSION))
+    return location.resolve(Long.toString(containerData
+        .getContainerID()).concat(CONTAINER_EXTENSION))
         .toFile();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
index ade162a..9d0ec95 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.util.StringUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -37,7 +38,7 @@ public final class DeletedContainerBlocksSummary {
   // value : the number of blocks need to be deleted in this container
   // if the message contains multiple entries for same block,
   // blocks will be merged
-  private final Map<String, Integer> blockSummary;
+  private final Map<Long, Integer> blockSummary;
   // total number of blocks in this message
   private int numOfBlocks;
 
@@ -47,14 +48,14 @@ public final class DeletedContainerBlocksSummary {
     blockSummary = Maps.newHashMap();
     blocks.forEach(entry -> {
       txSummary.put(entry.getTxID(), entry.getCount());
-      if (blockSummary.containsKey(entry.getContainerName())) {
-        blockSummary.put(entry.getContainerName(),
-            blockSummary.get(entry.getContainerName())
-                + entry.getBlockIDCount());
+      if (blockSummary.containsKey(entry.getContainerID())) {
+        blockSummary.put(entry.getContainerID(),
+            blockSummary.get(entry.getContainerID())
+                + entry.getLocalIDCount());
       } else {
-        blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
+        blockSummary.put(entry.getContainerID(), entry.getLocalIDCount());
       }
-      numOfBlocks += entry.getBlockIDCount();
+      numOfBlocks += entry.getLocalIDCount();
     });
   }
 
@@ -93,9 +94,9 @@ public final class DeletedContainerBlocksSummary {
           .append("TimesProceed=")
           .append(blks.getCount())
           .append(", ")
-          .append(blks.getContainerName())
+          .append(blks.getContainerID())
           .append(" : [")
-          .append(String.join(",", blks.getBlockIDList())).append("]")
+          .append(StringUtils.join(',', blks.getLocalIDList())).append("]")
           .append("\n");
     }
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
index 566db02..ec27452 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
@@ -65,7 +65,8 @@ public final class FileUtils {
         ContainerProtos.ReadChunkResponseProto.newBuilder();
     readChunkresponse.setChunkData(info.getProtoBufMessage());
     readChunkresponse.setData(ByteString.copyFrom(data));
-    
readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline());
+    readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().
+        getKeyData().getBlockID());
 
     ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
         ContainerProtos.GetSmallFileResponseProto.newBuilder();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
index 33eb911..dbd5772 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -63,11 +63,11 @@ public final class KeyUtils {
     ContainerCache cache = ContainerCache.getInstance(conf);
     Preconditions.checkNotNull(cache);
     try {
-      return cache.getDB(container.getContainerName(), container.getDBPath());
+      return cache.getDB(container.getContainerID(), container.getDBPath());
     } catch (IOException ex) {
       String message =
           String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s",
-          container.getContainerName(), container.getDBPath(), 
ex.getMessage());
+          container.getContainerID(), container.getDBPath(), ex.getMessage());
       throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
     }
   }
@@ -83,7 +83,7 @@ public final class KeyUtils {
     Preconditions.checkNotNull(container);
     ContainerCache cache = ContainerCache.getInstance(conf);
     Preconditions.checkNotNull(cache);
-    cache.removeDB(container.getContainerName());
+    cache.removeDB(container.getContainerID());
   }
   /**
    * Shutdown all DB Handles.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
index 457c417..3505196 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -66,13 +66,12 @@ public class ChunkManagerImpl implements ChunkManager {
   /**
    * writes a given chunk.
    *
-   * @param pipeline - Name and the set of machines that make this container.
-   * @param keyName - Name of the Key.
+   * @param blockID - ID of the block.
    * @param info - ChunkInfo.
    * @throws StorageContainerException
    */
   @Override
-  public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
+  public void writeChunk(BlockID blockID, ChunkInfo info,
       byte[] data, ContainerProtos.Stage stage)
       throws StorageContainerException {
     // we don't want container manager to go away while we are writing chunks.
@@ -80,13 +79,13 @@ public class ChunkManagerImpl implements ChunkManager {
 
     // TODO : Take keyManager Write lock here.
     try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
+      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+      long containerID = blockID.getContainerID();
+      Preconditions.checkState(containerID >= 0,
+          "Container ID cannot be negative");
       ContainerData container =
-          containerManager.readContainer(containerName);
-      File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
+          containerManager.readContainer(containerID);
+      File chunkFile = ChunkUtils.validateChunk(container, info);
       File tmpChunkFile = getTmpChunkFile(chunkFile, info);
 
       LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
@@ -96,16 +95,16 @@ public class ChunkManagerImpl implements ChunkManager {
         ChunkUtils.writeData(tmpChunkFile, info, data);
         break;
       case COMMIT_DATA:
-        commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
+        commitChunk(tmpChunkFile, chunkFile, containerID, info.getLen());
         break;
       case COMBINED:
         // directly write to the chunk file
         long oldSize = chunkFile.length();
         ChunkUtils.writeData(chunkFile, info, data);
         long newSize = chunkFile.length();
-        containerManager.incrBytesUsed(containerName, newSize - oldSize);
-        containerManager.incrWriteCount(containerName);
-        containerManager.incrWriteBytes(containerName, info.getLen());
+        containerManager.incrBytesUsed(containerID, newSize - oldSize);
+        containerManager.incrWriteCount(containerID);
+        containerManager.incrWriteBytes(containerID, info.getLen());
         break;
       default:
         throw new IOException("Can not identify write operation.");
@@ -136,22 +135,21 @@ public class ChunkManagerImpl implements ChunkManager {
 
   // Commit the chunk by renaming the temporary chunk file to chunk file
   private void commitChunk(File tmpChunkFile, File chunkFile,
-      String containerName, long chunkLen) throws IOException {
+      long containerID, long chunkLen) throws IOException {
     long sizeDiff = tmpChunkFile.length() - chunkFile.length();
     // It is safe to replace here as the earlier chunk if existing should be
     // caught as part of validateChunk
     Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
         StandardCopyOption.REPLACE_EXISTING);
-    containerManager.incrBytesUsed(containerName, sizeDiff);
-    containerManager.incrWriteCount(containerName);
-    containerManager.incrWriteBytes(containerName, chunkLen);
+    containerManager.incrBytesUsed(containerID, sizeDiff);
+    containerManager.incrWriteCount(containerID);
+    containerManager.incrWriteBytes(containerID, chunkLen);
   }
 
   /**
    * reads the data defined by a chunk.
    *
-   * @param pipeline - container pipeline.
-   * @param keyName - Name of the Key
+   * @param blockID - ID of the block.
    * @param info - ChunkInfo.
    * @return byte array
    * @throws StorageContainerException
@@ -159,20 +157,20 @@ public class ChunkManagerImpl implements ChunkManager {
    * TODO: Explore if we need to do that for ozone.
    */
   @Override
-  public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+  public byte[] readChunk(BlockID blockID, ChunkInfo info)
       throws StorageContainerException {
     containerManager.readLock();
     try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
+      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+      long containerID = blockID.getContainerID();
+      Preconditions.checkState(containerID >= 0,
+          "Container ID cannot be negative");
       ContainerData container =
-          containerManager.readContainer(containerName);
-      File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info);
+          containerManager.readContainer(containerID);
+      File chunkFile = ChunkUtils.getChunkFile(container, info);
       ByteBuffer data =  ChunkUtils.readData(chunkFile, info);
-      containerManager.incrReadCount(containerName);
-      containerManager.incrReadBytes(containerName, chunkFile.length());
+      containerManager.incrReadCount(containerID);
+      containerManager.incrReadBytes(containerID, chunkFile.length());
       return data.array();
     } catch (ExecutionException | NoSuchAlgorithmException e) {
       LOG.error("read data failed. error: {}", e);
@@ -191,25 +189,25 @@ public class ChunkManagerImpl implements ChunkManager {
   /**
    * Deletes a given chunk.
    *
-   * @param pipeline - Pipeline.
-   * @param keyName - Key Name
+   * @param blockID - ID of the block.
    * @param info - Chunk Info
    * @throws StorageContainerException
    */
   @Override
-  public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+  public void deleteChunk(BlockID blockID, ChunkInfo info)
       throws StorageContainerException {
     containerManager.readLock();
     try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
-          .readContainer(containerName), info);
+      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+      long containerID = blockID.getContainerID();
+      Preconditions.checkState(containerID >= 0,
+          "Container ID cannot be negative");
+
+      File chunkFile = ChunkUtils.getChunkFile(containerManager
+          .readContainer(containerID), info);
       if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
         FileUtil.fullyDelete(chunkFile);
-        containerManager.decrBytesUsed(containerName, chunkFile.length());
+        containerManager.decrBytesUsed(containerID, chunkFile.length());
       } else {
         LOG.error("Not Supported Operation. Trying to delete a " +
             "chunk that is in shared file. chunk info : " + info.toString());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 5e7375c..1893b3b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -24,7 +24,6 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -113,7 +112,9 @@ public class ContainerManagerImpl implements 
ContainerManager {
   static final Logger LOG =
       LoggerFactory.getLogger(ContainerManagerImpl.class);
 
-  private final ConcurrentSkipListMap<String, ContainerStatus>
+  // TODO: consider primitive collection like eclipse-collections
+  // to avoid autoboxing overhead
+  private final ConcurrentSkipListMap<Long, ContainerStatus>
       containerMap = new ConcurrentSkipListMap<>();
 
   // Use a non-fair RW lock for better throughput, we may revisit this decision
@@ -229,6 +230,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
     Preconditions.checkNotNull(keyName,
         "Container Name  to container key mapping is null");
 
+    long containerID = Long.parseLong(keyName);
     try {
       String containerFileName = containerName.concat(CONTAINER_EXTENSION);
       String metaFileName = containerName.concat(CONTAINER_META);
@@ -249,7 +251,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
         // when loading the info we get a null, this often means last time
         // SCM was ending up at some middle phase causing that the metadata
         // was not populated. Such containers are marked as inactive.
-        containerMap.put(keyName, new ContainerStatus(null));
+        containerMap.put(containerID, new ContainerStatus(null));
         return;
       }
       containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
@@ -263,7 +265,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
         // Hopefully SCM will ask us to delete this container and rebuild it.
         LOG.error("Invalid SHA found for container data. Name :{}"
             + "cowardly refusing to read invalid data", containerName);
-        containerMap.put(keyName, new ContainerStatus(null));
+        containerMap.put(containerID, new ContainerStatus(null));
         return;
       }
 
@@ -295,7 +297,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
       }).sum();
       containerStatus.setBytesUsed(bytesUsed);
 
-      containerMap.put(keyName, containerStatus);
+      containerMap.put(containerID, containerStatus);
     } catch (IOException | NoSuchAlgorithmException ex) {
       LOG.error("read failed for file: {} ex: {}", containerName,
           ex.getMessage());
@@ -303,7 +305,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
       // TODO : Add this file to a recovery Queue.
 
       // Remember that this container is busted and we cannot use it.
-      containerMap.put(keyName, new ContainerStatus(null));
+      containerMap.put(containerID, new ContainerStatus(null));
       throw new StorageContainerException("Unable to read container info",
           UNABLE_TO_READ_METADATA_DB);
     } finally {
@@ -316,18 +318,17 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Creates a container with the given name.
    *
-   * @param pipeline -- Nodes which make up this container.
    * @param containerData - Container Name and metadata.
    * @throws StorageContainerException - Exception
    */
   @Override
-  public void createContainer(Pipeline pipeline, ContainerData containerData)
+  public void createContainer(ContainerData containerData)
       throws StorageContainerException {
     Preconditions.checkNotNull(containerData, "Container data cannot be null");
     writeLock();
     try {
-      if (containerMap.containsKey(containerData.getName())) {
-        LOG.debug("container already exists. {}", containerData.getName());
+      if (containerMap.containsKey(containerData.getContainerID())) {
+        LOG.debug("container already exists. {}", 
containerData.getContainerID());
         throw new StorageContainerException("container already exists.",
             CONTAINER_EXISTS);
       }
@@ -399,7 +400,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
           location);
       File metadataFile = ContainerUtils.getMetadataFile(containerData,
           location);
-      String containerName = containerData.getContainerName();
+      String containerName = Long.toString(containerData.getContainerID());
 
       if(!overwrite) {
         ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
@@ -446,7 +447,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
       LOG.error("Creation of container failed. Name: {}, we might need to " +
               "cleanup partially created artifacts. ",
-          containerData.getContainerName(), ex);
+          containerData.getContainerID(), ex);
       throw new StorageContainerException("Container creation failed. ",
           ex, CONTAINER_INTERNAL_ERROR);
     } finally {
@@ -459,45 +460,45 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Deletes an existing container.
    *
-   * @param pipeline - nodes that make this container.
-   * @param containerName - name of the container.
+   * @param containerID - ID of the container.
    * @param forceDelete - whether this container should be deleted forcibly.
    * @throws StorageContainerException
    */
   @Override
-  public void deleteContainer(Pipeline pipeline, String containerName,
+  public void deleteContainer(long containerID,
       boolean forceDelete) throws StorageContainerException {
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
+    Preconditions.checkState(containerID >= 0,
+        "Container ID cannot be negative.");
     writeLock();
     try {
-      if (isOpen(pipeline.getContainerName())) {
+      if (isOpen(containerID)) {
         throw new StorageContainerException(
             "Deleting an open container is not allowed.",
             UNCLOSED_CONTAINER_IO);
       }
 
-      ContainerStatus status = containerMap.get(containerName);
+      ContainerStatus status = containerMap.get(containerID);
       if (status == null) {
-        LOG.debug("No such container. Name: {}", containerName);
-        throw new StorageContainerException("No such container. Name : " +
-            containerName, CONTAINER_NOT_FOUND);
+        LOG.debug("No such container. ID: {}", containerID);
+        throw new StorageContainerException("No such container. ID : " +
+            containerID, CONTAINER_NOT_FOUND);
       }
       if (status.getContainer() == null) {
-        LOG.debug("Invalid container data. Name: {}", containerName);
+        LOG.debug("Invalid container data. ID: {}", containerID);
         throw new StorageContainerException("Invalid container data. Name : " +
-            containerName, CONTAINER_NOT_FOUND);
+            containerID, CONTAINER_NOT_FOUND);
       }
       ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
-      containerMap.remove(containerName);
+      containerMap.remove(containerID);
     } catch (StorageContainerException e) {
       throw e;
     } catch (IOException e) {
       // TODO : An I/O error during delete can leave partial artifacts on the
       // disk. We will need the cleaner thread to cleanup this information.
-      LOG.error("Failed to cleanup container. Name: {}", containerName, e);
-      throw new StorageContainerException(containerName, e, IO_EXCEPTION);
+      String errMsg = String.format("Failed to cleanup container. ID: %d",
+          containerID);
+      LOG.error(errMsg, e);
+      throw new StorageContainerException(errMsg, e, IO_EXCEPTION);
     } finally {
       writeUnlock();
     }
@@ -511,25 +512,29 @@ public class ContainerManagerImpl implements 
ContainerManager {
    * time. It is possible that using this iteration you can miss certain
    * container from the listing.
    *
-   * @param prefix -  Return keys that match this prefix.
+   * @param startContainerID -  Return containers with ID >= startContainerID.
    * @param count - how many to return
-   * @param prevKey - Previous Key Value or empty String.
    * @param data - Actual containerData
    * @throws StorageContainerException
    */
   @Override
-  public void listContainer(String prefix, long count, String prevKey,
+  public void listContainer(long startContainerID, long count,
       List<ContainerData> data) throws StorageContainerException {
-    // TODO : Support list with Prefix and PrevKey
     Preconditions.checkNotNull(data,
         "Internal assertion: data cannot be null");
+    Preconditions.checkState(startContainerID >= 0,
+        "Start container ID cannot be negative");
+    Preconditions.checkState(count > 0,
+        "max number of containers returned " +
+            "must be positive");
+
     readLock();
     try {
-      ConcurrentNavigableMap<String, ContainerStatus> map;
-      if (prevKey == null || prevKey.isEmpty()) {
+      ConcurrentNavigableMap<Long, ContainerStatus> map;
+      if (startContainerID == 0) {
         map = containerMap.tailMap(containerMap.firstKey(), true);
       } else {
-        map = containerMap.tailMap(prevKey, false);
+        map = containerMap.tailMap(startContainerID, false);
       }
 
       int currentCount = 0;
@@ -549,24 +554,23 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Get metadata about a specific container.
    *
-   * @param containerName - Name of the container
+   * @param containerID - ID of the container
    * @return ContainerData - Container Data.
    * @throws StorageContainerException
    */
   @Override
-  public ContainerData readContainer(String containerName) throws
-      StorageContainerException {
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
-    if (!containerMap.containsKey(containerName)) {
-      throw new StorageContainerException("Unable to find the container. Name: 
"
-          + containerName, CONTAINER_NOT_FOUND);
+  public ContainerData readContainer(long containerID)
+      throws StorageContainerException {
+    Preconditions.checkState(containerID >= 0,
+        "Container ID cannot be negative.");
+    if (!containerMap.containsKey(containerID)) {
+      throw new StorageContainerException("Unable to find the container. ID: "
+          + containerID, CONTAINER_NOT_FOUND);
     }
-    ContainerData cData = containerMap.get(containerName).getContainer();
+    ContainerData cData = containerMap.get(containerID).getContainer();
     if (cData == null) {
-      throw new StorageContainerException("Invalid container data. Name: "
-          + containerName, CONTAINER_INTERNAL_ERROR);
+      throw new StorageContainerException("Invalid container data. ID: "
+          + containerID, CONTAINER_INTERNAL_ERROR);
     }
     return cData;
   }
@@ -575,13 +579,13 @@ public class ContainerManagerImpl implements 
ContainerManager {
    * Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
    *
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    * @throws StorageContainerException
    */
   @Override
-  public void closeContainer(String containerName)
+  public void closeContainer(long containerID)
       throws StorageContainerException, NoSuchAlgorithmException {
-    ContainerData containerData = readContainer(containerName);
+    ContainerData containerData = readContainer(containerID);
     containerData.closeContainer();
     writeContainerInfo(containerData, true);
     MetadataStore db = KeyUtils.getDB(containerData, conf);
@@ -602,15 +606,13 @@ public class ContainerManagerImpl implements 
ContainerManager {
     // issues.
 
     ContainerStatus status = new ContainerStatus(containerData);
-    containerMap.put(containerName, status);
+    containerMap.put(containerID, status);
   }
 
   @Override
-  public void updateContainer(Pipeline pipeline, String containerName,
-      ContainerData data, boolean forceUpdate)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
+  public void updateContainer(long containerID, ContainerData data,
+      boolean forceUpdate) throws StorageContainerException {
+    Preconditions.checkState(containerID >= 0, "Container ID cannot be 
negative.");
     Preconditions.checkNotNull(data, "Container data cannot be null");
     FileOutputStream containerStream = null;
     DigestOutputStream dos = null;
@@ -618,9 +620,9 @@ public class ContainerManagerImpl implements 
ContainerManager {
     File containerFileBK = null, containerFile = null;
     boolean deleted = false;
 
-    if(!containerMap.containsKey(containerName)) {
+    if(!containerMap.containsKey(containerID)) {
       throw new StorageContainerException("Container doesn't exist. Name :"
-          + containerName, CONTAINER_NOT_FOUND);
+          + containerID, CONTAINER_NOT_FOUND);
     }
 
     try {
@@ -633,7 +635,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
     try {
       Path location = locationManager.getContainerPath();
-      ContainerData orgData = containerMap.get(containerName).getContainer();
+      ContainerData orgData = containerMap.get(containerID).getContainer();
       if (orgData == null) {
         // updating a invalid container
         throw new StorageContainerException("Update a container with invalid" +
@@ -642,7 +644,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
       if (!forceUpdate && !orgData.isOpen()) {
         throw new StorageContainerException(
-            "Update a closed container is not allowed. Name: " + containerName,
+            "Update a closed container is not allowed. ID: " + containerID,
             UNSUPPORTED_REQUEST);
       }
 
@@ -652,7 +654,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
       if (!forceUpdate) {
         if (!containerFile.exists() || !containerFile.canWrite()) {
           throw new StorageContainerException(
-              "Container file not exists or corrupted. Name: " + containerName,
+              "Container file not exists or corrupted. ID: " + containerID,
               CONTAINER_INTERNAL_ERROR);
         }
 
@@ -672,7 +674,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
       // Update the in-memory map
       ContainerStatus newStatus = new ContainerStatus(data);
-      containerMap.replace(containerName, newStatus);
+      containerMap.replace(containerID, newStatus);
     } catch (IOException e) {
       // Restore the container file from backup
       if(containerFileBK != null && containerFileBK.exists() && deleted) {
@@ -683,8 +685,8 @@ public class ContainerManagerImpl implements 
ContainerManager {
               CONTAINER_INTERNAL_ERROR);
         } else {
           throw new StorageContainerException(
-              "Failed to restore container data from the backup. Name: "
-                  + containerName, CONTAINER_INTERNAL_ERROR);
+              "Failed to restore container data from the backup. ID: "
+                  + containerID, CONTAINER_INTERNAL_ERROR);
         }
       } else {
         throw new StorageContainerException(
@@ -711,22 +713,22 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Checks if a container exists.
    *
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    * @return true if the container is open false otherwise.
    * @throws StorageContainerException - Throws Exception if we are not able to
    *                                   find the container.
    */
   @Override
-  public boolean isOpen(String containerName) throws StorageContainerException 
{
-    final ContainerStatus status = containerMap.get(containerName);
+  public boolean isOpen(long containerID) throws StorageContainerException {
+    final ContainerStatus status = containerMap.get(containerID);
     if (status == null) {
       throw new StorageContainerException(
-          "Container status not found: " + containerName, CONTAINER_NOT_FOUND);
+          "Container status not found: " + containerID, CONTAINER_NOT_FOUND);
     }
     final ContainerData cData = status.getContainer();
     if (cData == null) {
       throw new StorageContainerException(
-          "Container not found: " + containerName, CONTAINER_NOT_FOUND);
+          "Container not found: " + containerID, CONTAINER_NOT_FOUND);
     }
     return cData.isOpen();
   }
@@ -746,7 +748,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
 
   @VisibleForTesting
-  public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
+  public ConcurrentSkipListMap<Long, ContainerStatus> getContainerMap() {
     return containerMap;
   }
 
@@ -901,7 +903,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
     for (ContainerStatus container: containers) {
       StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
           StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
-      ciBuilder.setContainerName(container.getContainer().getContainerName())
+      ciBuilder.setContainerID(container.getContainer().getContainerID())
           .setSize(container.getContainer().getMaxSize())
           .setUsed(container.getContainer().getBytesUsed())
           .setKeyCount(container.getContainer().getKeyCount())
@@ -966,7 +968,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
   }
 
   @Override
-  public void incrPendingDeletionBlocks(int numBlocks, String containerId) {
+  public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
     writeLock();
     try {
       ContainerStatus status = containerMap.get(containerId);
@@ -977,7 +979,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
   }
 
   @Override
-  public void decrPendingDeletionBlocks(int numBlocks, String containerId) {
+  public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
     writeLock();
     try {
       ContainerStatus status = containerMap.get(containerId);
@@ -990,35 +992,35 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Increase the read count of the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    */
   @Override
-  public void incrReadCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public void incrReadCount(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     status.incrReadCount();
   }
 
-  public long getReadCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long getReadCount(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.getReadCount();
   }
 
   /**
    * Increse the read counter for bytes read from the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @param readBytes     - bytes read from the container.
    */
   @Override
-  public void incrReadBytes(String containerName, long readBytes) {
-    ContainerStatus status = containerMap.get(containerName);
+  public void incrReadBytes(long containerId, long readBytes) {
+    ContainerStatus status = containerMap.get(containerId);
     status.incrReadBytes(readBytes);
   }
 
-  public long getReadBytes(String containerName) {
+  public long getReadBytes(long containerId) {
     readLock();
     try {
-      ContainerStatus status = containerMap.get(containerName);
+      ContainerStatus status = containerMap.get(containerId);
       return status.getReadBytes();
     } finally {
       readUnlock();
@@ -1028,76 +1030,76 @@ public class ContainerManagerImpl implements 
ContainerManager {
   /**
    * Increase the write count of the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId - Name of the container.
    */
   @Override
-  public void incrWriteCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public void incrWriteCount(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     status.incrWriteCount();
   }
 
-  public long getWriteCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long getWriteCount(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.getWriteCount();
   }
 
   /**
    * Increse the write counter for bytes write into the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId   - ID of the container.
    * @param writeBytes    - bytes write into the container.
    */
   @Override
-  public void incrWriteBytes(String containerName, long writeBytes) {
-    ContainerStatus status = containerMap.get(containerName);
+  public void incrWriteBytes(long containerId, long writeBytes) {
+    ContainerStatus status = containerMap.get(containerId);
     status.incrWriteBytes(writeBytes);
   }
 
-  public long getWriteBytes(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long getWriteBytes(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.getWriteBytes();
   }
 
   /**
    * Increase the bytes used by the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId   - ID of the container.
    * @param used          - additional bytes used by the container.
    * @return the current bytes used.
    */
   @Override
-  public long incrBytesUsed(String containerName, long used) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long incrBytesUsed(long containerId, long used) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.incrBytesUsed(used);
   }
 
   /**
    * Decrease the bytes used by the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId   - ID of the container.
    * @param used          - additional bytes reclaimed by the container.
    * @return the current bytes used.
    */
   @Override
-  public long decrBytesUsed(String containerName, long used) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long decrBytesUsed(long containerId, long used) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.decrBytesUsed(used);
   }
 
-  public long getBytesUsed(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long getBytesUsed(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.getBytesUsed();
   }
 
   /**
    * Get the number of keys in the container.
    *
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @return the current key count.
    */
   @Override
-  public long getNumKeys(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
+  public long getNumKeys(long containerId) {
+    ContainerStatus status = containerMap.get(containerId);
     return status.getNumKeys();  }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index d319565..46bd842 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -186,7 +187,7 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCreateContainer().getContainerData().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
@@ -230,7 +231,7 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCreateContainer().getContainerData().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
@@ -273,7 +274,7 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCreateContainer().getContainerData().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
@@ -318,17 +319,14 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-
-    Pipeline pipeline = Pipeline.getFromProtoBuf(
-        msg.getUpdateContainer().getPipeline());
-    String containerName = msg.getUpdateContainer()
-        .getContainerData().getName();
+    long containerID = msg.getUpdateContainer()
+        .getContainerData().getContainerID();
 
     ContainerData data = ContainerData.getFromProtBuf(
         msg.getUpdateContainer().getContainerData(), conf);
     boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
-    this.containerManager.updateContainer(
-        pipeline, containerName, data, forceUpdate);
+    this.containerManager.updateContainer(containerID,
+        data, forceUpdate);
     return ContainerUtils.getContainerResponse(msg);
   }
 
@@ -349,8 +347,9 @@ public class Dispatcher implements ContainerDispatcher {
       return ContainerUtils.malformedRequest(msg);
     }
 
-    String name = msg.getReadContainer().getName();
-    ContainerData container = this.containerManager.readContainer(name);
+    long containerID = msg.getReadContainer().getContainerID();
+    ContainerData container = this.containerManager.
+        readContainer(containerID);
     return ContainerUtils.getReadContainerResponse(msg, container);
   }
 
@@ -370,12 +369,9 @@ public class Dispatcher implements ContainerDispatcher {
       return ContainerUtils.malformedRequest(msg);
     }
 
-    Pipeline pipeline = Pipeline.getFromProtoBuf(
-        msg.getDeleteContainer().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-    String name = msg.getDeleteContainer().getName();
+    long containerID = msg.getDeleteContainer().getContainerID();
     boolean forceDelete = msg.getDeleteContainer().getForceDelete();
-    this.containerManager.deleteContainer(pipeline, name, forceDelete);
+    this.containerManager.deleteContainer(containerID, forceDelete);
     return ContainerUtils.getContainerResponse(msg);
   }
 
@@ -401,7 +397,7 @@ public class Dispatcher implements ContainerDispatcher {
         msg.getCreateContainer().getPipeline());
     Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
 
-    this.containerManager.createContainer(pipeline, cData);
+    this.containerManager.createContainer(cData);
     return ContainerUtils.getContainerResponse(msg);
   }
 
@@ -420,14 +416,12 @@ public class Dispatcher implements ContainerDispatcher {
             msg.getTraceID());
         return ContainerUtils.malformedRequest(msg);
       }
-      Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer()
-          .getPipeline());
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      long containerID = msg.getCloseContainer().getContainerID();
+      if (!this.containerManager.isOpen(containerID)) {
         throw new StorageContainerException("Attempting to close a closed " +
             "container.", CLOSED_CONTAINER_IO);
       }
-      this.containerManager.closeContainer(pipeline.getContainerName());
+      this.containerManager.closeContainer(containerID);
       return ContainerUtils.getContainerResponse(msg);
     } catch (NoSuchAlgorithmException e) {
       throw new StorageContainerException("No such Algorithm", e,
@@ -449,11 +443,9 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    String keyName = msg.getWriteChunk().getKeyName();
-    Pipeline pipeline = Pipeline.getFromProtoBuf(
-        msg.getWriteChunk().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+    BlockID blockID = BlockID.getFromProtobuf(
+        msg.getWriteChunk().getBlockID());
+    if (!this.containerManager.isOpen(blockID.getContainerID())) {
       throw new StorageContainerException("Write to closed container.",
           CLOSED_CONTAINER_IO);
     }
@@ -469,7 +461,7 @@ public class Dispatcher implements ContainerDispatcher {
 
     }
     this.containerManager.getChunkManager()
-        .writeChunk(pipeline, keyName, chunkInfo,
+        .writeChunk(blockID, chunkInfo,
             data, msg.getWriteChunk().getStage());
 
     return ChunkUtils.getChunkResponse(msg);
@@ -489,17 +481,13 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-
-    String keyName = msg.getReadChunk().getKeyName();
-    Pipeline pipeline = Pipeline.getFromProtoBuf(
-        msg.getReadChunk().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-
+    BlockID blockID = BlockID.getFromProtobuf(
+        msg.getReadChunk().getBlockID());
     ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
         .getChunkData());
     Preconditions.checkNotNull(chunkInfo);
-    byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
-        keyName, chunkInfo);
+    byte[] data = this.containerManager.getChunkManager().
+        readChunk(blockID, chunkInfo);
     metrics.incContainerBytesStats(Type.ReadChunk, data.length);
     return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
   }
@@ -519,11 +507,10 @@ public class Dispatcher implements ContainerDispatcher {
       return ContainerUtils.malformedRequest(msg);
     }
 
-    String keyName = msg.getDeleteChunk().getKeyName();
-    Pipeline pipeline = Pipeline.getFromProtoBuf(
-        msg.getDeleteChunk().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+    BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteChunk()
+        .getBlockID());
+    long containerID = blockID.getContainerID();
+    if (!this.containerManager.isOpen(containerID)) {
       throw new StorageContainerException("Write to closed container.",
           CLOSED_CONTAINER_IO);
     }
@@ -531,7 +518,7 @@ public class Dispatcher implements ContainerDispatcher {
         .getChunkData());
     Preconditions.checkNotNull(chunkInfo);
 
-    this.containerManager.getChunkManager().deleteChunk(pipeline, keyName,
+    this.containerManager.getChunkManager().deleteChunk(blockID,
         chunkInfo);
     return ChunkUtils.getChunkResponse(msg);
   }
@@ -550,15 +537,16 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    Pipeline pipeline = 
Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+    BlockID blockID = BlockID.getFromProtobuf(
+        msg.getPutKey().getKeyData().getBlockID());
+    long containerID = blockID.getContainerID();
+    if (!this.containerManager.isOpen(containerID)) {
       throw new StorageContainerException("Write to closed container.",
           CLOSED_CONTAINER_IO);
     }
     KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
     Preconditions.checkNotNull(keyData);
-    this.containerManager.getKeyManager().putKey(pipeline, keyData);
+    this.containerManager.getKeyManager().putKey(keyData);
     long numBytes = keyData.getProtoBufMessage().toByteArray().length;
     metrics.incContainerBytesStats(Type.PutKey, numBytes);
     return KeyUtils.getKeyResponse(msg);
@@ -601,17 +589,15 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    Pipeline pipeline =
-        Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
-    Preconditions.checkNotNull(pipeline);
-    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+    BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteKey()
+        .getBlockID());
+    Preconditions.checkNotNull(blockID);
+    long containerID = blockID.getContainerID();
+    if (!this.containerManager.isOpen(containerID)) {
       throw new StorageContainerException("Write to closed container.",
           CLOSED_CONTAINER_IO);
     }
-    String keyName = msg.getDeleteKey().getName();
-    Preconditions.checkNotNull(keyName);
-    Preconditions.checkState(!keyName.isEmpty());
-    this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
+    this.containerManager.getKeyManager().deleteKey(blockID);
     return KeyUtils.getKeyResponse(msg);
   }
 
@@ -632,12 +618,11 @@ public class Dispatcher implements ContainerDispatcher {
     }
     try {
 
-      Pipeline pipeline =
-          Pipeline.getFromProtoBuf(msg.getPutSmallFile()
-              .getKey().getPipeline());
+      BlockID blockID = BlockID.getFromProtobuf(msg.
+          getPutSmallFile().getKey().getKeyData().getBlockID());
+      long containerID = blockID.getContainerID();
 
-      Preconditions.checkNotNull(pipeline);
-      if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      if (!this.containerManager.isOpen(containerID)) {
         throw new StorageContainerException("Write to closed container.",
             CLOSED_CONTAINER_IO);
       }
@@ -648,12 +633,12 @@ public class Dispatcher implements ContainerDispatcher {
       byte[] data = msg.getPutSmallFile().getData().toByteArray();
 
       metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
-      this.containerManager.getChunkManager().writeChunk(pipeline, keyData
-          .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED);
+      this.containerManager.getChunkManager().writeChunk(blockID,
+          chunkInfo, data, ContainerProtos.Stage.COMBINED);
       List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
       chunks.add(chunkInfo.getProtoBufMessage());
       keyData.setChunks(chunks);
-      this.containerManager.getKeyManager().putKey(pipeline, keyData);
+      this.containerManager.getKeyManager().putKey(keyData);
       return FileUtils.getPutFileResponse(msg);
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, msg);
@@ -680,12 +665,7 @@ public class Dispatcher implements ContainerDispatcher {
       return ContainerUtils.malformedRequest(msg);
     }
     try {
-      Pipeline pipeline =
-          Pipeline.getFromProtoBuf(msg.getGetSmallFile()
-              .getKey().getPipeline());
-
       long bytes = 0;
-      Preconditions.checkNotNull(pipeline);
       KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
           .getKey().getKeyData());
       KeyData data = this.containerManager.getKeyManager().getKey(keyData);
@@ -694,9 +674,8 @@ public class Dispatcher implements ContainerDispatcher {
         bytes += chunk.getSerializedSize();
         ByteString current =
             ByteString.copyFrom(this.containerManager.getChunkManager()
-                .readChunk(
-                    pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
-                        chunk)));
+                .readChunk(keyData.getBlockID(),
+                    ChunkInfo.getFromProtoBuf(chunk)));
         dataBuf = dataBuf.concat(current);
         c = chunk;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to