HDFS-10238. Ozone : Add chunk persistance. Contributed by Anu Engineer.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ac97b18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ac97b18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ac97b18

Branch: refs/heads/HDFS-7240
Commit: 4ac97b1815ba1caf6477c736c10e433ee7d5bc50
Parents: 55d4ee7
Author: Chris Nauroth <cnaur...@apache.org>
Authored: Fri Apr 1 10:52:38 2016 -0700
Committer: Chris Nauroth <cnaur...@apache.org>
Committed: Fri Apr 1 10:52:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   4 +-
 .../container/common/helpers/ChunkInfo.java     | 184 ++++++++++++
 .../container/common/helpers/ChunkUtils.java    | 299 +++++++++++++++++++
 .../common/helpers/ContainerUtils.java          |  92 +++++-
 .../container/common/helpers/Pipeline.java      |  10 +-
 .../container/common/helpers/package-info.java  |   3 +-
 .../container/common/impl/ChunkManagerImpl.java | 149 +++++++++
 .../common/impl/ContainerManagerImpl.java       |  38 ++-
 .../ozone/container/common/impl/Dispatcher.java | 193 ++++++++++--
 .../common/interfaces/ChunkManager.java         |  68 +++++
 .../common/interfaces/ContainerManager.java     |  13 +
 .../container/ozoneimpl/OzoneContainer.java     |   5 +
 .../main/proto/DatanodeContainerProtocol.proto  |  45 ++-
 .../ozone/container/ContainerTestHelper.java    | 159 +++++++++-
 .../common/impl/TestContainerPersistence.java   | 244 ++++++++++++++-
 .../container/ozoneimpl/TestOzoneContainer.java |  54 +++-
 .../transport/server/TestContainerServer.java   |  17 +-
 17 files changed, 1461 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 9e52853..bebbb78 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -65,8 +65,8 @@ public final class OzoneConsts {
   public static final String CONTAINER_ROOT_PREFIX = "repository";
 
   public static final String CONTAINER_DB = "container.db";
-
-
+  public static final String FILE_HASH = "SHA-256";
+  public final static String CHUNK_OVERWRITE = "OverWriteRequested";
 
   /**
    * Supports Bucket Versioning.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
new file mode 100644
index 0000000..7a3d449
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Java class that represents ChunkInfo ProtoBuf class. This helper class 
allows
+ * us to convert to and from protobuf to normal java.
+ */
+public class ChunkInfo {
+  private final String chunkName;
+  private final long offset;
+  private final long len;
+  private String checksum;
+  private final Map<String, String> metadata;
+
+
+  /**
+   * Constructs a ChunkInfo.
+   *
+   * @param chunkName - File Name where chunk lives.
+   * @param offset    - offset where Chunk Starts.
+   * @param len       - Length of the Chunk.
+   */
+  public ChunkInfo(String chunkName, long offset, long len) {
+    this.chunkName = chunkName;
+    this.offset = offset;
+    this.len = len;
+    this.metadata = new TreeMap<>();
+  }
+
+  /**
+   * Adds metadata.
+   *
+   * @param key   - Key Name.
+   * @param value - Value.
+   * @throws IOException
+   */
+  public void addMetadata(String key, String value) throws IOException {
+    synchronized (this.metadata) {
+      if (this.metadata.containsKey(key)) {
+        throw new IOException("This key already exists. Key " + key);
+      }
+      metadata.put(key, value);
+    }
+  }
+
+  /**
+   * Gets a Chunkinfo class from the protobuf definitions.
+   *
+   * @param info - Protobuf class
+   * @return ChunkInfo
+   * @throws IOException
+   */
+  public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info)
+      throws IOException {
+    Preconditions.checkNotNull(info);
+
+    ChunkInfo chunkInfo = new ChunkInfo(info.getChunkName(), info.getOffset(),
+        info.getLen());
+
+    for (int x = 0; x < info.getMetadataCount(); x++) {
+      chunkInfo.addMetadata(info.getMetadata(x).getKey(),
+          info.getMetadata(x).getValue());
+    }
+
+
+    if (info.hasChecksum()) {
+      chunkInfo.setChecksum(info.getChecksum());
+    }
+    return chunkInfo;
+  }
+
+  /**
+   * Returns a ProtoBuf Message from ChunkInfo.
+   *
+   * @return Protocol Buffer Message
+   */
+  public ContainerProtos.ChunkInfo getProtoBufMessage() {
+    ContainerProtos.ChunkInfo.Builder builder = ContainerProtos
+        .ChunkInfo.newBuilder();
+
+    builder.setChunkName(this.getChunkName());
+    builder.setOffset(this.getOffset());
+    builder.setLen(this.getLen());
+    if (this.getChecksum() != null && !this.getChecksum().isEmpty()) {
+      builder.setChecksum(this.getChecksum());
+    }
+
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      ContainerProtos.KeyValue.Builder keyValBuilder =
+          ContainerProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Returns the chunkName.
+   *
+   * @return - String
+   */
+  public String getChunkName() {
+    return chunkName;
+  }
+
+  /**
+   * Gets the start offset of the given chunk in physical file.
+   *
+   * @return - long
+   */
+  public long getOffset() {
+    return offset;
+  }
+
+  /**
+   * Returns the length of the Chunk.
+   *
+   * @return long
+   */
+  public long getLen() {
+    return len;
+  }
+
+  /**
+   * Returns the SHA256 value of this chunk.
+   *
+   * @return - Hash String
+   */
+  public String getChecksum() {
+    return checksum;
+  }
+
+  /**
+   * Sets the Hash value of this chunk.
+   *
+   * @param checksum - Hash String.
+   */
+  public void setChecksum(String checksum) {
+    this.checksum = checksum;
+  }
+
+  /**
+   * Returns Metadata associated with this Chunk.
+   *
+   * @return - Map of Key,values.
+   */
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
+  @Override
+  public String toString() {
+    return "ChunkInfo{" +
+        "chunkName='" + chunkName +
+        ", offset=" + offset +
+        ", len=" + len +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
new file mode 100644
index 0000000..03370ac
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Set of utility functions used by the chunk Manager.
+ */
+public final class ChunkUtils {
+
+  /* Never constructed. */
+  private ChunkUtils() {
+  }
+
+  /**
+   * Checks if we are getting a request to overwrite an existing range of
+   * chunk.
+   *
+   * @param chunkFile - File
+   * @param chunkInfo - Buffer to write
+   * @return bool
+   */
+  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
+      chunkInfo) {
+
+    if (!chunkFile.exists()) {
+      return false;
+    }
+
+    long offset = chunkInfo.getOffset();
+    return offset < chunkFile.length();
+  }
+
+  /**
+   * Overwrite is permitted if an only if the user explicitly asks for it. We
+   * permit this iff the key/value pair contains a flag called
+   * [OverWriteRequested, true].
+   *
+   * @param chunkInfo - Chunk info
+   * @return true if the user asks for it.
+   */
+  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
+    String overWrite = 
chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
+    return (overWrite != null) &&
+        (!overWrite.isEmpty()) &&
+        (Boolean.valueOf(overWrite));
+  }
+
+  /**
+   * Validates chunk data and returns a file object to Chunk File that we are
+   * expected to write data to.
+   *
+   * @param pipeline - pipeline.
+   * @param data     - container data.
+   * @param info     - chunk info.
+   * @return File
+   * @throws IOException
+   */
+  public static File validateChunk(Pipeline pipeline, ContainerData data,
+                                   ChunkInfo info) throws IOException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    File chunkFile = getChunkFile(pipeline, data, info);
+    if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
+      if (!ChunkUtils.isOverWritePermitted(info)) {
+        log.error("Rejecting write chunk request. Chunk overwrite " +
+            "without explicit request. {}", info.toString());
+        throw new IOException("Rejecting write chunk request. OverWrite " +
+            "flag required." + info.toString());
+      }
+    }
+    return chunkFile;
+  }
+
+  /**
+   * Validates that Path to chunk file exists.
+   *
+   * @param pipeline - Container Info.
+   * @param data     - Container Data
+   * @param info     - Chunk info
+   * @return - File.
+   * @throws IOException
+   */
+  public static File getChunkFile(Pipeline pipeline, ContainerData data,
+                                  ChunkInfo info) throws IOException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+    if (data == null) {
+      log.error("Invalid container Name: {}", pipeline.getContainerName());
+      throw new IOException("Unable to find the container Name: " +
+          pipeline.getContainerName());
+    }
+
+    File dataDir = ContainerUtils.getDataDirectory(data).toFile();
+    if (!dataDir.exists()) {
+      log.error("Unable to find the data directory: {}", dataDir);
+      throw new IOException("Unable to find the data directory: " + dataDir);
+    }
+
+    return dataDir.toPath().resolve(info.getChunkName()).toFile();
+
+  }
+
+  /**
+   * Writes the data in chunk Info to the specified location in the chunkfile.
+   *
+   * @param chunkFile - File to write data to.
+   * @param chunkInfo - Data stream to write.
+   * @throws IOException
+   */
+  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
+                               byte[] data)
+      throws IOException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+    if (data.length != chunkInfo.getLen()) {
+      String err = String.format("data array does not match the length " +
+              "specified. DataLen: %d Byte Array: %d",
+          chunkInfo.getLen(), data.length);
+      log.error(err);
+      throw new IOException(err);
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.CREATE,
+              StandardOpenOption.WRITE,
+              StandardOpenOption.SPARSE,
+              StandardOpenOption.SYNC);
+      lock = file.lock().get();
+      if (!chunkInfo.getChecksum().isEmpty()) {
+        verifyChecksum(chunkInfo, data, log);
+      }
+      int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();
+      if(size != data.length) {
+        log.error("Invalid write size found. Size:{}  Expected: {} " , size,
+            data.length);
+        throw new IOException("Invalid write size found. Size: " + size
+            + " Expected: " + data.length);
+      }
+    } finally {
+      if (lock != null) {
+        lock.release();
+      }
+      if (file != null) {
+        IOUtils.closeStream(file);
+      }
+    }
+  }
+
+  /**
+   * Verifies the checksum of a chunk against the data buffer.
+   *
+   * @param chunkInfo - Chunk Info.
+   * @param data      - data buffer
+   * @param log       - log
+   * @throws NoSuchAlgorithmException
+   * @throws IOException
+   */
+  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
+      log) throws NoSuchAlgorithmException, IOException {
+    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha.update(data);
+    if (!Hex.encodeHexString(sha.digest()).equals(
+        chunkInfo.getChecksum())) {
+      log.error("Checksum mismatch. Provided: {} , computed: {}",
+          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
+      throw new IOException("Checksum mismatch. Provided: " +
+          chunkInfo.getChecksum() + " , computed: " +
+          DigestUtils.sha256Hex(sha.digest()));
+    }
+  }
+
+  /**
+   * Reads data from an existing chunk file.
+   *
+   * @param chunkFile - file where data lives.
+   * @param data      - chunk defintion.
+   * @return ByteBuffer
+   * @throws IOException
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
+      IOException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    if (!chunkFile.exists()) {
+      log.error("Unable to find the chunk file. chunk info : {}",
+          data.toString());
+      throw new IOException("Unable to find the chunk file. chunk info " +
+          data.toString());
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.READ);
+      lock = file.lock(data.getOffset(), data.getLen(), true).get();
+
+      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
+      file.read(buf, data.getOffset()).get();
+
+      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
+        verifyChecksum(data, buf.array(), log);
+      }
+
+      return buf;
+    } finally {
+      if (lock != null) {
+        lock.release();
+      }
+      if (file != null) {
+        IOUtils.closeStream(file);
+      }
+    }
+  }
+
+  /**
+   * Returns a CreateContainer Response. This call is used by create and delete
+   * containers which have null success responses.
+   *
+   * @param msg Request
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+  /**
+   * Gets a response to the read chunk calls.
+   * @param msg - Msg
+   * @param data  - Data
+   * @param info  - Info
+   * @return    Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg,
+                           byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+
+    ContainerProtos.ReadChunkReponseProto.Builder response =
+        ContainerProtos.ReadChunkReponseProto.newBuilder();
+    response.setChunkData(info.getProtoBufMessage());
+    response.setData(ByteString.copyFrom(data));
+    response.setPipeline(msg.getReadChunk().getPipeline());
+
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setReadChunk(response);
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 79e9aeb..9300d06 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -59,7 +59,8 @@ public final class ContainerUtils {
   /**
    * Returns a ReadContainer Response.
    *
-   * @param msg Request
+   * @param msg           Request
+   * @param containerData - data
    * @return Response.
    */
   public static ContainerProtos.ContainerCommandResponseProto
@@ -81,7 +82,9 @@ public final class ContainerUtils {
    * We found a command type but no associated payload for the command. Hence
    * return malformed Command as response.
    *
-   * @param msg - Protobuf message.
+   * @param msg     - Protobuf message.
+   * @param result  - result
+   * @param message - Error message.
    * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
    */
   public static ContainerProtos.ContainerCommandResponseProto.Builder
@@ -185,24 +188,97 @@ public final class ContainerUtils {
    * @throws IOException
    */
   public static Path createMetadata(Path containerPath) throws IOException {
+    Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
     Preconditions.checkNotNull(containerPath);
-    containerPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
-    if (!containerPath.toFile().mkdirs()) {
+    Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
+    if (!metadataPath.toFile().mkdirs()) {
+      log.error("Unable to create directory for metadata storage. Path: {}",
+          metadataPath);
       throw new IOException("Unable to create directory for metadata storage." 
+
-          " Path {}" + containerPath);
+          " Path: " + metadataPath);
     }
-    containerPath = containerPath.resolve(OzoneConsts.CONTAINER_DB);
-    LevelDBStore store = new LevelDBStore(containerPath.toFile(), true);
+    LevelDBStore store =
+        new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
+            .toFile(), true);
 
     // we close since the SCM pre-creates containers.
     // we will open and put Db handle into a cache when keys are being created
     // in a container.
 
     store.close();
-    return containerPath;
+
+    Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
+    if (!dataPath.toFile().mkdirs()) {
+
+      // If we failed to create data directory, we cleanup the
+      // metadata directory completely. That is, we will delete the
+      // whole directory including LevelDB file.
+      log.error("Unable to create directory for data storage. cleaning up the" 
+
+              " container path: {} dataPath: {}",
+          containerPath, dataPath);
+      FileUtils.deleteDirectory(containerPath.toFile());
+      throw new IOException("Unable to create directory for data storage." +
+          " Path: " + dataPath);
+    }
+    return metadataPath;
+  }
+
+  /**
+   * Returns Metadata location.
+   *
+   * @param containerData - Data
+   * @param location      - Path
+   * @return Path
+   */
+  public static File getMetadataFile(ContainerData containerData,
+                                     Path location) {
+    return location.resolve(containerData
+        .getContainerName().concat(CONTAINER_META))
+        .toFile();
+  }
+
+  /**
+   * Returns container file location.
+   * @param containerData  - Data
+   * @param location - Root path
+   * @return Path
+   */
+  public static File getContainerFile(ContainerData containerData,
+                                      Path location) {
+    return location.resolve(containerData
+        .getContainerName().concat(CONTAINER_EXTENSION))
+        .toFile();
+  }
+
+  /**
+   * Container metadata directory -- here is where the level DB lives.
+   * @param cData - cData.
+   * @return Path to the parent directory where the DB lives.
+   */
+  public static Path getMetadataDirectory(ContainerData cData) {
+    Path dbPath = Paths.get(cData.getDBPath());
+    Preconditions.checkNotNull(dbPath);
+    Preconditions.checkState(dbPath.toString().length() > 0);
+    return dbPath.getParent();
   }
 
   /**
+   * Returns the path where data or chunks live for a given container.
+   * @param cData - cData container
+   * @return - Path
+   */
+  public static Path getDataDirectory(ContainerData cData) throws IOException {
+    Path path = getMetadataDirectory(cData);
+    Preconditions.checkNotNull(path);
+    path = path.getParent();
+    if(path == null) {
+      throw new IOException("Unable to get Data directory. null path found");
+    }
+    return path.resolve(OzoneConsts.CONTAINER_DATA_PATH);
+  }
+
+
+  /**
    * remove Container if it is empty.
    * <p/>
    * There are three things we need to delete.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
index 140341c..c1ec48d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
@@ -58,9 +58,8 @@ public class Pipeline {
     for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
       newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
     }
-    if (pipeline.hasContainerName()) {
-      newPipeline.containerName = newPipeline.getContainerName();
-    }
+
+    newPipeline.setContainerName(pipeline.getContainerName());
     return newPipeline;
   }
 
@@ -105,9 +104,7 @@ public class Pipeline {
       builder.addMembers(datanode.getProtoBufMessage());
     }
     builder.setLeaderID(leaderID);
-    if (this.containerName != null) {
-      builder.setContainerName(this.containerName);
-    }
+    builder.setContainerName(this.containerName);
     return builder.build();
   }
 
@@ -128,5 +125,4 @@ public class Pipeline {
   public void setContainerName(String containerName) {
     this.containerName = containerName;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
index fe7e37a..21f31e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
@@ -17,5 +17,6 @@
  */
 package org.apache.hadoop.ozone.container.common.helpers;
 /**
- Contains protocol buffer helper classes.
+ Contains protocol buffer helper classes and utilites used in
+ impl.
  **/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
new file mode 100644
index 0000000..090d659
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * An implementation of ChunkManager that is used by default in ozone.
+ */
+public class ChunkManagerImpl implements ChunkManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+  private final ContainerManager containerManager;
+
+  /**
+   * Constructs a ChunkManager.
+   *
+   * @param manager - ContainerManager.
+   */
+  public ChunkManagerImpl(ContainerManager manager) {
+    this.containerManager = manager;
+  }
+
+  /**
+   * writes a given chunk.
+   *
+   * @param pipeline - Name and the set of machines that make this container.
+   * @param keyName  - Name of the Key.
+   * @param info     - ChunkInfo.
+   * @throws IOException
+   */
+  @Override
+  public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
+                         byte[] data)
+      throws IOException {
+
+    // we don't want container manager to go away while we are writing chunks.
+    containerManager.readLock();
+
+    // TODO : Take keyManager Write lock here.
+    try {
+      Preconditions.checkNotNull(pipeline);
+      Preconditions.checkNotNull(pipeline.getContainerName());
+      File chunkFile = ChunkUtils.validateChunk(pipeline,
+          containerManager.readContainer(pipeline.getContainerName()), info);
+      ChunkUtils.writeData(chunkFile, info, data);
+
+    } catch (ExecutionException |
+        NoSuchAlgorithmException e) {
+      LOG.error("write data failed. error: {}", e);
+      throw new IOException("Internal error: ", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("write data failed. error: {}", e);
+      throw new IOException("Internal error: ", e);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * reads the data defined by a chunk.
+   *
+   * @param pipeline - container pipeline.
+   * @param keyName  - Name of the Key
+   * @param info     - ChunkInfo.
+   * @return byte array
+   * @throws IOException TODO: Right now we do not support partial reads and
+   *                     writes of chunks. TODO: Explore if we need to do that
+   *                     for ozone.
+   */
+  @Override
+  public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+      throws IOException {
+    containerManager.readLock();
+    try {
+      File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
+          .readContainer(pipeline.getContainerName()), info);
+      return ChunkUtils.readData(chunkFile, info).array();
+    } catch (ExecutionException | NoSuchAlgorithmException e) {
+      LOG.error("read data failed. error: {}", e);
+      throw new IOException("Internal error: ", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("read data failed. error: {}", e);
+      throw new IOException("Internal error: ", e);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * Deletes a given chunk.
+   *
+   * @param pipeline - Pipeline.
+   * @param keyName  - Key Name
+   * @param info     - Chunk Info
+   * @throws IOException
+   */
+  @Override
+  public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+      throws IOException {
+
+    containerManager.readLock();
+    try {
+      File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
+          .readContainer(pipeline.getContainerName()), info);
+      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
+        FileUtil.fullyDelete(chunkFile);
+      } else {
+        LOG.error("Not Supported Operation. Trying to delete a " +
+            "chunk that is in shared file. chunk info : " + info.toString());
+        throw new IOException("Not Supported Operation. Trying to delete a " +
+            "chunk that is in shared file. chunk info : " + info.toString());
+      }
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 599b2f2..1d6a695 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -26,9 +26,11 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
 import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.slf4j.Logger;
@@ -69,6 +71,7 @@ public class ContainerManagerImpl implements ContainerManager 
{
   // for waiting threads.
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
   private ContainerLocationManager locationManager;
+  private ChunkManager chunkManager;
 
   /**
    * Init call that sets up a container Manager.
@@ -141,7 +144,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
       metaStream = new FileInputStream(metaFileName);
 
-      MessageDigest sha = MessageDigest.getInstance("SHA-256");
+      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
 
       dis = new DigestInputStream(containerStream, sha);
 
@@ -238,13 +241,9 @@ public class ContainerManagerImpl implements 
ContainerManager {
     FileOutputStream metaStream = null;
     Path location = locationManager.getContainerPath();
 
-    File containerFile = location.resolve(containerData
-        .getContainerName().concat(CONTAINER_EXTENSION))
-        .toFile();
-
-    File metadataFile = location.resolve(containerData
-        .getContainerName().concat(CONTAINER_META))
-        .toFile();
+    File containerFile = ContainerUtils.getContainerFile(containerData,
+        location);
+    File metadataFile = ContainerUtils.getMetadataFile(containerData, 
location);
 
     try {
       ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
@@ -255,10 +254,11 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
       containerStream = new FileOutputStream(containerFile);
       metaStream = new FileOutputStream(metadataFile);
-      MessageDigest sha = MessageDigest.getInstance("SHA-256");
+      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
 
       dos = new DigestOutputStream(containerStream, sha);
-      containerData.setDBPath(metadataPath.toString());
+      containerData.setDBPath(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
+          .toString());
       containerData.setContainerPath(containerFile.toString());
 
       ContainerProtos.ContainerData protoData = containerData
@@ -293,6 +293,8 @@ public class ContainerManagerImpl implements 
ContainerManager {
     }
   }
 
+
+
   /**
    * Deletes an existing container.
    *
@@ -368,6 +370,10 @@ public class ContainerManagerImpl implements 
ContainerManager {
    */
   @Override
   public ContainerData readContainer(String containerName) throws IOException {
+    if(!containerMap.containsKey(containerName)) {
+      throw new IOException("Unable to find the container. Name: "
+          + containerName);
+    }
     return containerMap.get(containerName).getContainer();
   }
 
@@ -447,6 +453,18 @@ public class ContainerManagerImpl implements 
ContainerManager {
   }
 
   /**
+   * Sets the chunk Manager.
+   * @param chunkManager
+   */
+  public void setChunkManager(ChunkManager chunkManager) {
+    this.chunkManager = chunkManager;
+  }
+
+  public ChunkManager getChunkManager() {
+    return this.chunkManager;
+  }
+
+  /**
    * Filter out only container files from the container metadata dir.
    */
   private static class ContainerFilter implements FilenameFilter {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index 7a45557..d39b709 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -23,6 +23,8 @@ import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
@@ -68,6 +70,12 @@ public class Dispatcher implements ContainerDispatcher {
     }
 
 
+    if ((cmdType == Type.WriteChunk) ||
+        (cmdType == Type.ReadChunk) ||
+        (cmdType == Type.DeleteChunk)) {
+      return chunkProcessHandler(msg);
+    }
+
     return ContainerUtils.unsupportedRequest(msg);
   }
 
@@ -81,28 +89,66 @@ public class Dispatcher implements ContainerDispatcher {
   private ContainerCommandResponseProto containerProcessHandler(
       ContainerCommandRequestProto msg) throws IOException {
     try {
-      ContainerData cData = ContainerData.getFromProtBuf(
-          msg.getCreateContainer().getContainerData());
-
-      Pipeline pipeline = Pipeline.getFromProtoBuf(
-          msg.getCreateContainer().getPipeline());
-      Preconditions.checkNotNull(pipeline);
 
       switch (msg.getCmdType()) {
       case CreateContainer:
-        return handleCreateContainer(msg, cData, pipeline);
+        return handleCreateContainer(msg);
 
       case DeleteContainer:
-        return handleDeleteContainer(msg, cData, pipeline);
+        return handleDeleteContainer(msg);
 
       case ListContainer:
+        // TODO : Support List Container.
         return ContainerUtils.unsupportedRequest(msg);
 
       case UpdateContainer:
+        // TODO : Support Update Container.
         return ContainerUtils.unsupportedRequest(msg);
 
       case ReadContainer:
-        return handleReadContainer(msg, cData);
+        return handleReadContainer(msg);
+
+      default:
+        return ContainerUtils.unsupportedRequest(msg);
+      }
+    } catch (IOException ex) {
+      LOG.warn("Container operation failed. " +
+              "Container: {} Operation: {}  trace ID: {} Error: {}",
+          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCmdType().name(),
+          msg.getTraceID(),
+          ex.toString());
+
+      // TODO : Replace with finer error codes.
+      return ContainerUtils.getContainerResponse(msg,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+          ex.toString()).build();
+    }
+  }
+
+  /**
+   * Handles the all chunk related functionality.
+   *
+   * @param msg - command
+   * @return - response
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto chunkProcessHandler(
+      ContainerCommandRequestProto msg) throws IOException {
+    try {
+
+      switch (msg.getCmdType()) {
+      case WriteChunk:
+        return handleWriteChunk(msg);
+
+      case ReadChunk:
+        return handleReadChunk(msg);
+
+      case DeleteChunk:
+        return handleDeleteChunk(msg);
+
+      case ListChunk:
+        return ContainerUtils.unsupportedRequest(msg);
 
       default:
         return ContainerUtils.unsupportedRequest(msg);
@@ -125,13 +171,12 @@ public class Dispatcher implements ContainerDispatcher {
   /**
    * Calls into container logic and returns appropriate response.
    *
-   * @param msg   - Request
-   * @param cData - Container Data object
+   * @param msg - Request
    * @return ContainerCommandResponseProto
    * @throws IOException
    */
   private ContainerCommandResponseProto handleReadContainer(
-      ContainerCommandRequestProto msg, ContainerData cData)
+      ContainerCommandRequestProto msg)
       throws IOException {
 
     if (!msg.hasReadContainer()) {
@@ -139,51 +184,147 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    ContainerData container = this.containerManager.readContainer(
-        cData.getContainerName());
+
+    String name = msg.getReadContainer().getName();
+    ContainerData container = this.containerManager.readContainer(name);
     return ContainerUtils.getReadContainerResponse(msg, container);
   }
 
   /**
    * Calls into container logic and returns appropriate response.
    *
-   * @param msg      - Request
-   * @param cData    - ContainerData
-   * @param pipeline - Pipeline is the machines where this container lives.
+   * @param msg - Request
    * @return Response.
    * @throws IOException
    */
   private ContainerCommandResponseProto handleDeleteContainer(
-      ContainerCommandRequestProto msg, ContainerData cData,
-      Pipeline pipeline) throws IOException {
+      ContainerCommandRequestProto msg) throws IOException {
+
     if (!msg.hasDeleteContainer()) {
       LOG.debug("Malformed delete container request. trace ID: {}",
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    this.containerManager.deleteContainer(pipeline,
-        cData.getContainerName());
+
+    String name = msg.getDeleteContainer().getName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getDeleteContainer().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
+    this.containerManager.deleteContainer(pipeline, name);
     return ContainerUtils.getContainerResponse(msg);
   }
 
   /**
    * Calls into container logic and returns appropriate response.
    *
-   * @param msg      - Request
-   * @param cData    - ContainerData
-   * @param pipeline - Pipeline is the machines where this container lives.
+   * @param msg - Request
    * @return Response.
    * @throws IOException
    */
   private ContainerCommandResponseProto handleCreateContainer(
-      ContainerCommandRequestProto msg, ContainerData cData,
-      Pipeline pipeline) throws IOException {
+      ContainerCommandRequestProto msg) throws IOException {
     if (!msg.hasCreateContainer()) {
       LOG.debug("Malformed create container request. trace ID: {}",
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
+    ContainerData cData = ContainerData.getFromProtBuf(
+        msg.getCreateContainer().getContainerData());
+    Preconditions.checkNotNull(cData);
+
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getCreateContainer().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
     this.containerManager.createContainer(pipeline, cData);
     return ContainerUtils.getContainerResponse(msg);
   }
+
+  /**
+   * Calls into chunk manager to write a chunk.
+   *
+   * @param msg - Request.
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleWriteChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasWriteChunk()) {
+      LOG.debug("Malformed write chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String keyName = msg.getWriteChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getWriteChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+    byte[] data = msg.getWriteChunk().getData().toByteArray();
+    this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
+        chunkInfo, data);
+    return ChunkUtils.getChunkResponse(msg);
+  }
+
+  /**
+   * Calls into chunk manager to read a chunk.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleReadChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasReadChunk()) {
+      LOG.debug("Malformed read chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String keyName = msg.getReadChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getReadChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+    byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
+        keyName, chunkInfo);
+    return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
+  }
+
+  /**
+   * Calls into chunk manager to write a chunk.
+   *
+   * @param msg - Request.
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleDeleteChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasDeleteChunk()) {
+      LOG.debug("Malformed delete chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String keyName = msg.getDeleteChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getDeleteChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+
+    this.containerManager.getChunkManager().deleteChunk(pipeline, keyName,
+        chunkInfo);
+    return ChunkUtils.getChunkResponse(msg);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
new file mode 100644
index 0000000..19af452
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+
+/**
+ * Chunk Manager allows read, write, delete and listing of chunks in
+ * a container.
+ */
+public interface ChunkManager {
+
+  /**
+   * writes a given chunk.
+   * @param pipeline - Name and the set of machines that make this container.
+   * @param keyName - Name of the Key.
+   * @param info - ChunkInfo.
+   * @throws IOException
+   */
+  void writeChunk(Pipeline pipeline, String keyName,
+                  ChunkInfo info, byte[] data) throws IOException;
+
+  /**
+   * reads the data defined by a chunk.
+   * @param pipeline - container pipeline.
+   * @param keyName - Name of the Key
+   * @param info - ChunkInfo.
+   * @return  byte array
+   * @throws IOException
+   *
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+      IOException;
+
+  /**
+   * Deletes a given chunk.
+   * @param pipeline  - Pipeline.
+   * @param keyName   - Key Name
+   * @param info  - Chunk Info
+   * @throws IOException
+   */
+  void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+      IOException;
+
+  // TODO : Support list operations.
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index a0ddcd9..eba5d9a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -97,4 +97,17 @@ public interface ContainerManager extends RwLock {
    */
   void shutdown() throws IOException;
 
+  /**
+   * Sets the Chunk Manager.
+   * @param chunkManager - ChunkManager.
+   */
+  void setChunkManager(ChunkManager chunkManager);
+
+  /**
+   * Gets the Chunk Manager.
+   * @return  ChunkManager.
+   */
+  ChunkManager getChunkManager();
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index cc85a24..b793f47 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -21,8 +21,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
@@ -48,6 +50,7 @@ public class OzoneContainer {
   private final ContainerDispatcher dispatcher;
   private final ContainerManager manager;
   private final XceiverServer server;
+  private final ChunkManager chunkManager;
 
   /**
    * Creates a network endpoint and enables Ozone container.
@@ -74,6 +77,8 @@ public class OzoneContainer {
 
     manager = new ContainerManagerImpl();
     manager.init(this.ozoneConfig, locations, this.dataSet);
+    this.chunkManager = new ChunkManagerImpl(manager);
+    manager.setChunkManager(this.chunkManager);
 
     this.dispatcher = new Dispatcher(manager);
     server = new XceiverServer(this.ozoneConfig, this.dispatcher);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
index 38a378f..4bb53e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
@@ -157,7 +157,7 @@ message ContainerCommandResponseProto {
 message Pipeline {
   required string leaderID = 1;
   repeated DatanodeIDProto members = 2;
-  optional string containerName = 3;
+  required string containerName = 3;
 }
 
 message KeyValue {
@@ -244,7 +244,7 @@ message  ReadKeyRequestProto  {
 
 message  ReadKeyResponeProto  {
   repeated KeyValue metadata = 1;
-  repeated chunkInfo chunkData = 2;
+  repeated ChunkInfo chunkData = 2;
 }
 
 message  UpdateKeyRequestProto {
@@ -276,19 +276,19 @@ message  ListKeyResponeProto {
 
 // Chunk Operations
 
-message chunkInfo {
-  required uint64 offset = 1;
-  required uint64 len = 2;
-  optional uint64 checksum = 3;
-  repeated KeyValue metadata = 4;
+message ChunkInfo {
+  required string chunkName = 1;
+  required uint64 offset = 2;
+  required uint64 len = 3;
+  optional string checksum = 4;
+  repeated KeyValue metadata = 5;
 }
 
 message  WriteChunkRequestProto  {
   required Pipeline pipeline = 1;
-  required string containerName = 2;
-  required string keyName = 3;
-  required chunkInfo chunkData = 4;
-  repeated bytes data = 5;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
+  required bytes data = 4;
 }
 
 message  WriteChunkReponseProto {
@@ -296,22 +296,20 @@ message  WriteChunkReponseProto {
 
 message  ReadChunkRequestProto  {
   required Pipeline pipeline = 1;
-  required string containerName = 2;
-  required string keyName = 3;
-  required chunkInfo chunkData = 4;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
 }
 
 message  ReadChunkReponseProto {
   required Pipeline pipeline = 1;
-  required chunkInfo chunkData = 2;
-  repeated bytes data = 3;
+  required ChunkInfo chunkData = 2;
+  required bytes data = 3;
 }
 
 message  DeleteChunkRequestProto {
   required Pipeline pipeline = 1;
-  required string containerName = 2;
-  required string keyName = 3;
-  required chunkInfo chunkData = 4;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
 }
 
 message  DeleteChunkResponseProto {
@@ -319,13 +317,12 @@ message  DeleteChunkResponseProto {
 
 message  ListChunkRequestProto {
   required Pipeline pipeline = 1;
-  required string containerName = 2;
-  required string keyName = 3;
-  required uint64 startOffset = 4;
-  required uint32 count = 5;
+  required string keyName = 2;
+  required string prevChunkName = 3;
+  required uint32 count = 4;
 }
 
 message  ListChunkResponseProto {
-  repeated chunkInfo chunkData = 1;
+  repeated ChunkInfo chunkData = 1;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 0d1b269..bef290c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -18,22 +18,32 @@
 
 package org.apache.hadoop.ozone.container;
 
+import com.google.protobuf.ByteString;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOExceptionWithCause;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Random;
 import java.util.UUID;
 
 /**
  * Helpers for container tests.
  */
 public class ContainerTestHelper {
+  private static Random r = new Random();
 
   /**
    * Create a pipeline with single node replica.
@@ -41,7 +51,8 @@ public class ContainerTestHelper {
    * @return Pipeline with single node in it.
    * @throws IOException
    */
-  public static Pipeline createSingleNodePipeline() throws IOException {
+  public static Pipeline createSingleNodePipeline(String containerName) throws
+      IOException {
     ServerSocket socket = new ServerSocket(0);
     int port = socket.getLocalPort();
     DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
@@ -50,26 +61,160 @@ public class ContainerTestHelper {
     datanodeID.setContainerPort(port);
     Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
     pipeline.addMember(datanodeID);
+    pipeline.setContainerName(containerName);
     socket.close();
     return pipeline;
   }
 
   /**
+   * Creates a ChunkInfo for testing.
+   *
+   * @param keyName - Name of the key
+   * @param seqNo   - Chunk number.
+   * @return ChunkInfo
+   * @throws IOException
+   */
+  public static ChunkInfo getChunk(String keyName, int seqNo, long offset,
+                                   long len) throws IOException {
+
+    ChunkInfo info = new ChunkInfo(String.format("%s.data.%d", keyName,
+        seqNo), offset, len);
+    return info;
+  }
+
+  /**
+   * Generates some data of the requested len.
+   *
+   * @param len - Number of bytes.
+   * @return byte array with valid data.
+   */
+  public static byte[] getData(int len) {
+    byte[] data = new byte[len];
+    r.nextBytes(data);
+    return data;
+  }
+
+  /**
+   * Computes the hash and sets the value correctly.
+   *
+   * @param info - chunk info.
+   * @param data - data array
+   * @throws NoSuchAlgorithmException
+   */
+  public static void setDataChecksum(ChunkInfo info, byte[] data)
+      throws NoSuchAlgorithmException {
+    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha.update(data);
+    info.setChecksum(Hex.encodeHexString(sha.digest()));
+  }
+
+  /**
+   * Returns a writeChunk Request.
+   *
+   * @param containerName - Name
+   * @param keyName       - Name
+   * @param datalen       - data len.
+   * @return Request.
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  public static ContainerCommandRequestProto getWriteChunkRequest(
+      Pipeline pipeline, String containerName, String keyName, int datalen)
+      throws
+      IOException, NoSuchAlgorithmException {
+    ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
+        ContainerProtos.WriteChunkRequestProto
+            .newBuilder();
+
+    pipeline.setContainerName(containerName);
+    writeRequest.setPipeline(pipeline.getProtobufMessage());
+    writeRequest.setKeyName(keyName);
+
+    byte[] data = getData(datalen);
+    ChunkInfo info = getChunk(keyName, 0, 0, datalen);
+    setDataChecksum(info, data);
+
+    writeRequest.setChunkData(info.getProtoBufMessage());
+    writeRequest.setData(ByteString.copyFrom(data));
+
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.WriteChunk);
+    request.setWriteChunk(writeRequest);
+    return request.build();
+  }
+
+  /**
+   * Returns a read Request.
+   *
+   * @param request writeChunkRequest.
+   * @return Request.
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  public static ContainerCommandRequestProto getReadChunkRequest(
+      ContainerProtos.WriteChunkRequestProto request)
+      throws
+      IOException, NoSuchAlgorithmException {
+    ContainerProtos.ReadChunkRequestProto.Builder readRequest =
+        ContainerProtos.ReadChunkRequestProto.newBuilder();
+
+    readRequest.setPipeline(request.getPipeline());
+
+    readRequest.setKeyName(request.getKeyName());
+    readRequest.setChunkData(request.getChunkData());
+
+    ContainerCommandRequestProto.Builder newRequest =
+        ContainerCommandRequestProto.newBuilder();
+    newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
+    newRequest.setReadChunk(readRequest);
+    return newRequest.build();
+  }
+
+  /**
+   * Returns a delete Request.
+   *
+   * @param writeRequest - write request
+   * @return request
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  public static ContainerCommandRequestProto getDeleteChunkRequest(
+      ContainerProtos.WriteChunkRequestProto writeRequest)
+      throws
+      IOException, NoSuchAlgorithmException {
+    ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest =
+        ContainerProtos.DeleteChunkRequestProto
+            .newBuilder();
+
+    deleteRequest.setPipeline(writeRequest.getPipeline());
+    deleteRequest.setChunkData(writeRequest.getChunkData());
+    deleteRequest.setKeyName(writeRequest.getKeyName());
+
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.DeleteChunk);
+    request.setDeleteChunk(deleteRequest);
+    return request.build();
+  }
+
+  /**
    * Returns a create container command for test purposes. There are a bunch of
    * tests where we need to just send a request and get a reply.
    *
    * @return ContainerCommandRequestProto.
    */
-  public static ContainerCommandRequestProto getCreateContainerRequest() throws
-      IOException {
+  public static ContainerCommandRequestProto getCreateContainerRequest(
+      String containerName) throws IOException {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
     ContainerProtos.ContainerData.Builder containerData = ContainerProtos
         .ContainerData.newBuilder();
-    containerData.setName("testContainer");
+    containerData.setName(containerName);
     createRequest.setPipeline(
-        ContainerTestHelper.createSingleNodePipeline().getProtobufMessage());
+        ContainerTestHelper.createSingleNodePipeline(containerName)
+            .getProtobufMessage());
     createRequest.setContainerData(containerData.build());
 
     ContainerCommandRequestProto.Builder request =
@@ -86,7 +231,7 @@ public class ContainerTestHelper {
    * @return ContainerCommandRequestProto.
    */
   public static ContainerCommandResponseProto
-  getCreateContainerResponse(ContainerCommandRequestProto request) throws
+    getCreateContainerResponse(ContainerCommandRequestProto request) throws
       IOException {
     ContainerProtos.CreateContainerResponseProto.Builder createResponse =
         ContainerProtos.CreateContainerResponseProto.newBuilder();
@@ -99,6 +244,4 @@ public class ContainerTestHelper {
     response.setResult(ContainerProtos.Result.SUCCESS);
     return response.build();
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 3b498e2..213019e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -18,14 +18,17 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.junit.After;
@@ -33,13 +36,19 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -47,6 +56,10 @@ import java.util.Map;
 
 import static org.apache.hadoop.ozone.container.ContainerTestHelper
     .createSingleNodePipeline;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper
+    .setDataChecksum;
 import static org.junit.Assert.fail;
 
 /**
@@ -56,11 +69,15 @@ public class TestContainerPersistence {
 
   static String path;
   static ContainerManagerImpl containerManager;
+  static ChunkManagerImpl chunkManager;
   static OzoneConfiguration conf;
   static FsDatasetSpi fsDataSet;
   static MiniDFSCluster cluster;
   static List<Path> pathLists = new LinkedList<>();
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   @BeforeClass
   public static void init() throws IOException {
     conf = new OzoneConfiguration();
@@ -84,6 +101,9 @@ public class TestContainerPersistence {
     cluster.waitActive();
     fsDataSet = cluster.getDataNodes().get(0).getFSDataset();
     containerManager = new ContainerManagerImpl();
+    chunkManager = new ChunkManagerImpl(containerManager);
+    containerManager.setChunkManager(chunkManager);
+
   }
 
   @AfterClass
@@ -115,7 +135,8 @@ public class TestContainerPersistence {
     ContainerData data = new ContainerData(containerName);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName),
+        data);
     Assert.assertTrue(containerManager.getContainerMap()
         .containsKey(containerName));
     ContainerManagerImpl.ContainerStatus status = containerManager
@@ -132,14 +153,11 @@ public class TestContainerPersistence {
     String containerPathString = ContainerUtils.getContainerNameFromFile(new
         File(status.getContainer().getContainerPath()));
 
-    Path meta = Paths.get(containerPathString);
-
-    String metadataFile = meta.toString() + OzoneConsts.CONTAINER_META;
-    Assert.assertTrue(new File(metadataFile).exists());
+    Path meta = Paths.get(status.getContainer().getDBPath()).getParent();
+    Assert.assertTrue(Files.exists(meta));
 
 
     String dbPath = status.getContainer().getDBPath();
-
     LevelDBStore store = null;
     try {
       store = new LevelDBStore(new File(dbPath), false);
@@ -158,9 +176,9 @@ public class TestContainerPersistence {
     ContainerData data = new ContainerData(containerName);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName), 
data);
     try {
-      containerManager.createContainer(createSingleNodePipeline(), data);
+      
containerManager.createContainer(createSingleNodePipeline(containerName), data);
       fail("Expected Exception not thrown.");
     } catch (IOException ex) {
       Assert.assertNotNull(ex);
@@ -176,12 +194,12 @@ public class TestContainerPersistence {
     ContainerData data = new ContainerData(containerName1);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName1), 
data);
 
     data = new ContainerData(containerName2);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName2), 
data);
 
 
     Assert.assertTrue(containerManager.getContainerMap()
@@ -189,7 +207,7 @@ public class TestContainerPersistence {
     Assert.assertTrue(containerManager.getContainerMap()
         .containsKey(containerName2));
 
-    containerManager.deleteContainer(createSingleNodePipeline(),
+    containerManager.deleteContainer(createSingleNodePipeline(containerName1),
         containerName1);
     Assert.assertFalse(containerManager.getContainerMap()
         .containsKey(containerName1));
@@ -200,7 +218,7 @@ public class TestContainerPersistence {
     data = new ContainerData(containerName1);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName1), 
data);
 
     // Assert we still have both containers.
     Assert.assertTrue(containerManager.getContainerMap()
@@ -228,7 +246,7 @@ public class TestContainerPersistence {
       ContainerData data = new ContainerData(containerName);
       data.addMetadata("VOLUME", "shire");
       data.addMetadata("owner)", "bilbo");
-      containerManager.createContainer(createSingleNodePipeline(), data);
+      
containerManager.createContainer(createSingleNodePipeline(containerName), data);
       testMap.put(containerName, data);
     }
 
@@ -251,6 +269,204 @@ public class TestContainerPersistence {
     // Assert that we listed all the keys that we had put into
     // container.
     Assert.assertTrue(testMap.isEmpty());
+  }
+
+  /**
+   * Writes a single chunk.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testWriteChunk() throws IOException, NoSuchAlgorithmException {
+    final int datalen = 1024;
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+
+    pipeline.setContainerName(containerName);
+    ContainerData cData = new ContainerData(containerName);
+    cData.addMetadata("VOLUME", "shire");
+    cData.addMetadata("owner)", "bilbo");
+    containerManager.createContainer(pipeline, cData);
+    ChunkInfo info = getChunk(keyName, 0, 0, datalen);
+    byte[] data = getData(datalen);
+    setDataChecksum(info, data);
+    chunkManager.writeChunk(pipeline, keyName, info, data);
+  }
+
+  /**
+   * Writes many chunks of the same key into different chunk files and verifies
+   * that we have that data in many files.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testWritReadManyChunks() throws IOException,
+      NoSuchAlgorithmException {
+    final int datalen = 1024;
+    final int chunkCount = 1024;
+
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    Map<String, ChunkInfo> fileHashMap = new HashMap<>();
+
+    pipeline.setContainerName(containerName);
+    ContainerData cData = new ContainerData(containerName);
+    cData.addMetadata("VOLUME", "shire");
+    cData.addMetadata("owner)", "bilbo");
+    containerManager.createContainer(pipeline, cData);
+    for (int x = 0; x < chunkCount; x++) {
+      ChunkInfo info = getChunk(keyName, x, 0, datalen);
+      byte[] data = getData(datalen);
+      setDataChecksum(info, data);
+      chunkManager.writeChunk(pipeline, keyName, info, data);
+      String fileName = String.format("%s.data.%d", keyName, x);
+      fileHashMap.put(fileName, info);
+    }
+
+    ContainerData cNewData = containerManager.readContainer(containerName);
+    Assert.assertNotNull(cNewData);
+    Path dataDir = ContainerUtils.getDataDirectory(cNewData);
+
+    String globFormat = String.format("%s.data.*", keyName);
+    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+
+    // Read chunk via file system and verify.
+    int count = 0;
+    try (DirectoryStream<Path> stream =
+             Files.newDirectoryStream(dataDir, globFormat)) {
+      for (Path fname : stream) {
+        sha.update(FileUtils.readFileToByteArray(fname.toFile()));
+        String val = Hex.encodeHexString(sha.digest());
+        Assert.assertEquals(fileHashMap.get(fname.getFileName().toString())
+                .getChecksum(),
+            val);
+        count++;
+        sha.reset();
+      }
+      Assert.assertEquals(chunkCount, count);
+
+      // Read chunk via ReadChunk call.
+      sha.reset();
+      for (int x = 0; x < chunkCount; x++) {
+        String fileName = String.format("%s.data.%d", keyName, x);
+        ChunkInfo info = fileHashMap.get(fileName);
+        byte[] data = chunkManager.readChunk(pipeline, keyName, info);
+        sha.update(data);
+        Assert.assertEquals(Hex.encodeHexString(sha.digest()),
+            info.getChecksum());
+        sha.reset();
+      }
+    }
+  }
 
+  /**
+   * Writes a single chunk and tries to overwrite that chunk without over write
+   * flag then re-tries with overwrite flag.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testOverWrite() throws IOException,
+      NoSuchAlgorithmException {
+    final int datalen = 1024;
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+
+    pipeline.setContainerName(containerName);
+    ContainerData cData = new ContainerData(containerName);
+    cData.addMetadata("VOLUME", "shire");
+    cData.addMetadata("owner)", "bilbo");
+    containerManager.createContainer(pipeline, cData);
+    ChunkInfo info = getChunk(keyName, 0, 0, datalen);
+    byte[] data = getData(datalen);
+    setDataChecksum(info, data);
+    chunkManager.writeChunk(pipeline, keyName, info, data);
+    try {
+      chunkManager.writeChunk(pipeline, keyName, info, data);
+    } catch(IOException ex) {
+      Assert.assertTrue(ex.getMessage().contains(
+          "Rejecting write chunk request. OverWrite flag required."));
+    }
+
+    // With the overwrite flag it should work now.
+    info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+    chunkManager.writeChunk(pipeline, keyName, info, data);
+  }
+
+  /**
+   * This test writes data as many small writes and tries to read back the data
+   * in a single large read.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testMultipleWriteSingleRead() throws IOException,
+      NoSuchAlgorithmException {
+    final int datalen = 1024;
+    final int chunkCount = 1024;
+
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+
+    pipeline.setContainerName(containerName);
+    ContainerData cData = new ContainerData(containerName);
+    cData.addMetadata("VOLUME", "shire");
+    cData.addMetadata("owner)", "bilbo");
+    containerManager.createContainer(pipeline, cData);
+    MessageDigest oldSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    for (int x = 0; x < chunkCount; x++) {
+      // we are writing to the same chunk file but at different offsets.
+      long offset = x * datalen;
+      ChunkInfo info = getChunk(keyName, 0, offset, datalen);
+      byte[] data = getData(datalen);
+      oldSha.update(data);
+      setDataChecksum(info, data);
+      chunkManager.writeChunk(pipeline, keyName, info, data);
+    }
+
+    // Request to read the whole data in a single go.
+    ChunkInfo largeChunk = getChunk(keyName, 0, 0, datalen * chunkCount);
+    byte[] newdata = chunkManager.readChunk(pipeline, keyName, largeChunk);
+    MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    newSha.update(newdata);
+    Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
+        Hex.encodeHexString(newSha.digest()));
+  }
+
+  /**
+   * Writes a chunk and deletes it, re-reads to make sure it is gone.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testDeleteChunk() throws IOException,
+      NoSuchAlgorithmException {
+    final int datalen = 1024;
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+
+    pipeline.setContainerName(containerName);
+    ContainerData cData = new ContainerData(containerName);
+    cData.addMetadata("VOLUME", "shire");
+    cData.addMetadata("owner)", "bilbo");
+    containerManager.createContainer(pipeline, cData);
+    ChunkInfo info = getChunk(keyName, 0, 0, datalen);
+    byte[] data = getData(datalen);
+    setDataChecksum(info, data);
+    chunkManager.writeChunk(pipeline, keyName, info, data);
+    chunkManager.deleteChunk(pipeline, keyName, info);
+    exception.expect(IOException.class);
+    exception.expectMessage("Unable to find the chunk file.");
+    chunkManager.readChunk(pipeline, keyName, info);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ac97b18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 5beb8b9..b3c7a77 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -37,14 +38,13 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 
-
 public class TestOzoneContainer {
   @Test
   public void testCreateOzoneContainer() throws Exception {
-
+    String containerName = OzoneUtils.getRequestID();
     Configuration conf = new OzoneConfiguration();
     URL p = conf.getClass().getResource("");
-    String  path = p.getPath().concat(
+    String path = p.getPath().concat(
         TestOzoneContainer.class.getSimpleName());
     path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
         OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
@@ -59,7 +59,8 @@ public class TestOzoneContainer {
     cluster.waitActive();
 
 
-    Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
+    Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
+        (containerName);
     conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
         pipeline.getLeader().getContainerPort());
     OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
@@ -69,7 +70,7 @@ public class TestOzoneContainer {
     XceiverClient client = new XceiverClient(pipeline, conf);
     client.connect();
     ContainerProtos.ContainerCommandRequestProto request =
-        ContainerTestHelper.getCreateContainerRequest();
+        ContainerTestHelper.getCreateContainerRequest(containerName);
     ContainerProtos.ContainerCommandResponseProto response =
         client.sendCommand(request);
     Assert.assertNotNull(response);
@@ -79,13 +80,13 @@ public class TestOzoneContainer {
 
   }
 
-
   @Test
   public void testOzoneContainerViaDataNode() throws Exception {
-
+    String keyName = OzoneUtils.getRequestID();
+    String containerName = OzoneUtils.getRequestID();
     Configuration conf = new OzoneConfiguration();
     URL p = conf.getClass().getResource("");
-    String  path = p.getPath().concat(
+    String path = p.getPath().concat(
         TestOzoneContainer.class.getSimpleName());
     path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
         OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
@@ -95,7 +96,8 @@ public class TestOzoneContainer {
     conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
     conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
 
-    Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
+    Pipeline pipeline =
+        ContainerTestHelper.createSingleNodePipeline(containerName);
     conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
         pipeline.getLeader().getContainerPort());
 
@@ -105,12 +107,44 @@ public class TestOzoneContainer {
     // This client talks to ozone container via datanode.
     XceiverClient client = new XceiverClient(pipeline, conf);
     client.connect();
+
+    // Create container
     ContainerProtos.ContainerCommandRequestProto request =
-        ContainerTestHelper.getCreateContainerRequest();
+        ContainerTestHelper.getCreateContainerRequest(containerName);
     ContainerProtos.ContainerCommandResponseProto response =
         client.sendCommand(request);
     Assert.assertNotNull(response);
     Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
+    // Write Chunk
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
+            keyName, 1024);
+
+    response = client.sendCommand(writeChunkRequest);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
+    // Read Chunk
+    request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
+        .getWriteChunk());
+
+    response = client.sendCommand(request);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
+    //Delete Chunk
+    request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
+        .getWriteChunk());
+
+    response = client.sendCommand(request);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
+    client.close();
     cluster.shutdown();
 
   }

Reply via email to