http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
new file mode 100644
index 0000000..b9ec462
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
+import org.apache.hadoop.cblock.exception.CBlockException;
+import org.apache.hadoop.cblock.meta.ContainerDescriptor;
+import org.apache.hadoop.cblock.meta.VolumeDescriptor;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.proto.MountVolumeResponse;
+import org.apache.hadoop.cblock.util.KeyUtil;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
+import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class maintains the key space of CBlock, more specifically, the
+ * volume to container mapping. The core data structure
+ * is a map from users to their volumes info, where volume info is a handler
+ * to a volume, containing information for IO on that volume and a storage
+ * client responsible for talking to the SCM.
+ */
+public class StorageManager {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(StorageManager.class);
+  private final ScmClient storageClient;
+  private final int numThreads;
+  private static final int MAX_THREADS =
+      Runtime.getRuntime().availableProcessors() * 2;
+  private static final int MAX_QUEUE_CAPACITY = 1024;
+  private final String cblockId;
+
+  /**
+   * We will NOT have the situation where same kv pair getting
+   * processed, but it is possible to have multiple kv pair being
+   * processed at same time.
+   *
+   * So using just ConcurrentHashMap should be sufficient
+   *
+   * Again since currently same user accessing from multiple places
+   * is not allowed, no need to consider concurrency of volume map
+   * within one user
+   */
+  private ConcurrentHashMap<String, HashMap<String, VolumeDescriptor>>
+      user2VolumeMap;
+  // size of an underlying container.
+  // TODO : assuming all containers are of the same size
+  private long containerSizeB;
+
+  public StorageManager(ScmClient storageClient,
+      OzoneConfiguration ozoneConfig, String cblockId) throws IOException {
+    this.storageClient = storageClient;
+    this.user2VolumeMap = new ConcurrentHashMap<>();
+    this.containerSizeB = storageClient.getContainerSize(null);
+    this.numThreads =
+        ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
+            CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
+    this.cblockId = cblockId;
+  }
+
+  /**
+   * This call will put the volume into in-memory map.
+   *
+   * more specifically, make the volume discoverable on jSCSI server
+   * and keep it's reference in-memory for look up.
+   * @param userName the user name of the volume.
+   * @param volumeName the name of the volume,
+   * @param volume a {@link VolumeDescriptor} object encapsulating the
+   *               information about the volume.
+   */
+  private void makeVolumeReady(String userName, String volumeName,
+      VolumeDescriptor volume) {
+    HashMap<String, VolumeDescriptor> userVolumes;
+    if (user2VolumeMap.containsKey(userName)) {
+      userVolumes = user2VolumeMap.get(userName);
+    } else {
+      userVolumes = new HashMap<>();
+      user2VolumeMap.put(userName, userVolumes);
+    }
+    userVolumes.put(volumeName, volume);
+  }
+
+  /**
+   * Called by CBlockManager to add volumes read from persistent store into
+   * memory, need to contact SCM to setup the reference to the containers given
+   * their id.
+   *
+   * Only for failover process where container meta info is read from
+   * persistent store, and containers themselves are alive.
+   *
+   * TODO : Currently, this method is not being called as failover process
+   * is not implemented yet.
+   *
+   * @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating
+   *                         the information about a volume.
+   * @throws IOException when adding the volume failed. e.g. volume already
+   * exist, or no more container available.
+   */
+  public synchronized void addVolume(VolumeDescriptor volumeDescriptor)
+      throws IOException{
+    String userName = volumeDescriptor.getUserName();
+    String volumeName = volumeDescriptor.getVolumeName();
+    LOGGER.info("addVolume:" + userName + ":" + volumeName);
+    if (user2VolumeMap.containsKey(userName)
+        && user2VolumeMap.get(userName).containsKey(volumeName)) {
+      throw new CBlockException("Volume already exist for "
+          + userName + ":" + volumeName);
+    }
+    // the container ids are read from levelDB, setting up the
+    // container handlers here.
+    String[] containerIds = volumeDescriptor.getContainerIDs();
+
+    for (String containerId : containerIds) {
+      try {
+        Pipeline pipeline = storageClient.getContainer(containerId);
+        ContainerDescriptor containerDescriptor =
+            new ContainerDescriptor(containerId);
+        containerDescriptor.setPipeline(pipeline);
+        volumeDescriptor.addContainer(containerDescriptor);
+      } catch (IOException e) {
+        LOGGER.error("Getting container failed! Container:{} error:{}",
+            containerId, e);
+        throw e;
+      }
+    }
+    // now ready to put into in-memory map.
+    makeVolumeReady(userName, volumeName, volumeDescriptor);
+  }
+
+  private class CreateContainerTask implements Runnable {
+    private final VolumeDescriptor volume;
+    private final int containerIdx;
+    private final ArrayList<String> containerIds;
+    private final AtomicInteger numFailed;
+
+    CreateContainerTask(VolumeDescriptor volume, int containerIdx,
+                        ArrayList<String> containerIds,
+                        AtomicInteger numFailed) {
+      this.volume = volume;
+      this.containerIdx = containerIdx;
+      this.containerIds = containerIds;
+      this.numFailed = numFailed;
+    }
+
+    /**
+     * When an object implementing interface <code>Runnable</code> is used
+     * to create a thread, starting the thread causes the object's
+     * <code>run</code> method to be called in that separately executing
+     * thread.
+     * <p>
+     * The general contract of the method <code>run</code> is that it may
+     * take any action whatsoever.
+     *
+     * @see Thread#run()
+     */
+    public void run() {
+      ContainerDescriptor container = null;
+      try {
+        Pipeline pipeline = storageClient.createContainer(
+            HdslProtos.ReplicationType.STAND_ALONE,
+            HdslProtos.ReplicationFactor.ONE,
+            KeyUtil.getContainerName(volume.getUserName(),
+                volume.getVolumeName(), containerIdx), cblockId);
+
+        container = new ContainerDescriptor(pipeline.getContainerName());
+
+        container.setPipeline(pipeline);
+        container.setContainerIndex(containerIdx);
+        volume.addContainer(container);
+        containerIds.set(containerIdx, container.getContainerID());
+      } catch (Exception e) {
+        numFailed.incrementAndGet();
+        if (container != null) {
+          LOGGER.error("Error creating container Container:{}:" +
+              " index:{} error:{}", container.getContainerID(),
+              containerIdx, e);
+        } else {
+          LOGGER.error("Error creating container.", e);
+        }
+      }
+    }
+  }
+
+  private boolean createVolumeContainers(VolumeDescriptor volume) {
+    ArrayList<String> containerIds = new ArrayList<>();
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        Math.min(numThreads, MAX_THREADS),
+        MAX_THREADS, 1, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+    AtomicInteger numFailedCreates = new AtomicInteger(0);
+    long allocatedSize = 0;
+    int containerIdx = 0;
+    while (allocatedSize < volume.getVolumeSize()) {
+      // adding null to allocate space in ArrayList
+      containerIds.add(containerIdx, null);
+      Runnable task = new CreateContainerTask(volume, containerIdx,
+          containerIds, numFailedCreates);
+      executor.submit(task);
+      allocatedSize += containerSizeB;
+      containerIdx += 1;
+    }
+
+    // issue the command and then wait for it to finish
+    executor.shutdown();
+    try {
+      executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error creating volume:{} error:{}",
+          volume.getVolumeName(), e);
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+
+    volume.setContainerIDs(containerIds);
+    return numFailedCreates.get() == 0;
+  }
+
+  private void deleteContainer(String containerID, boolean force) {
+    try {
+      Pipeline pipeline = storageClient.getContainer(containerID);
+      storageClient.deleteContainer(pipeline, force);
+    } catch (Exception e) {
+      LOGGER.error("Error deleting container Container:{} error:{}",
+          containerID, e);
+    }
+  }
+
+  private void deleteVolumeContainers(List<String> containers, boolean force)
+      throws CBlockException {
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        Math.min(numThreads, MAX_THREADS),
+        MAX_THREADS, 1, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+    for (String deleteContainer : containers) {
+      if (deleteContainer != null) {
+        Runnable task = () -> deleteContainer(deleteContainer, force);
+        executor.submit(task);
+      }
+    }
+
+    // issue the command and then wait for it to finish
+    executor.shutdown();
+    try {
+      executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error deleting containers error:{}", e);
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Called by CBlock server when creating a fresh volume. The core
+   * logic is adding needed information into in-memory meta data.
+   *
+   * @param userName the user name of the volume.
+   * @param volumeName the name of the volume.
+   * @param volumeSize the size of the volume.
+   * @param blockSize the block size of the volume.
+   * @throws CBlockException when the volume can not be created.
+   */
+  public synchronized void createVolume(String userName, String volumeName,
+      long volumeSize, int blockSize) throws CBlockException {
+    LOGGER.debug("createVolume:" + userName + ":" + volumeName);
+    if (user2VolumeMap.containsKey(userName)
+        && user2VolumeMap.get(userName).containsKey(volumeName)) {
+      throw new CBlockException("Volume already exist for "
+          + userName + ":" + volumeName);
+    }
+    if (volumeSize < blockSize) {
+      throw new CBlockException("Volume size smaller than block size? " +
+          "volume size:" + volumeSize + " block size:" + blockSize);
+    }
+    VolumeDescriptor volume
+        = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
+    boolean success = createVolumeContainers(volume);
+    if (!success) {
+      // cleanup the containers and throw the exception
+      deleteVolumeContainers(volume.getContainerIDsList(), true);
+      throw new CBlockException("Error when creating volume:" + volumeName);
+    }
+    makeVolumeReady(userName, volumeName, volume);
+  }
+
+  /**
+   * Called by CBlock server to delete a specific volume. Mainly
+   * to check whether it can be deleted, and remove it from in-memory meta
+   * data.
+   *
+   * @param userName the user name of the volume.
+   * @param volumeName the name of the volume.
+   * @param force if set to false, only delete volume it is empty, otherwise
+   *              throw exception. if set to true, delete regardless.
+   * @throws CBlockException when the volume can not be deleted.
+   */
+  public synchronized void deleteVolume(String userName, String volumeName,
+      boolean force) throws CBlockException {
+    if (!user2VolumeMap.containsKey(userName)
+        || !user2VolumeMap.get(userName).containsKey(volumeName)) {
+      throw new CBlockException("Deleting non-exist volume "
+          + userName + ":" + volumeName);
+    }
+    if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) {
+      throw new CBlockException("Deleting a non-empty volume without force!");
+    }
+    VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
+    deleteVolumeContainers(volume.getContainerIDsList(), force);
+    if (user2VolumeMap.get(userName).size() == 0) {
+      user2VolumeMap.remove(userName);
+    }
+  }
+
+  /**
+   * Called by CBlock server to get information of a specific volume.
+   *
+   * @param userName the user name of the volume.
+   * @param volumeName the name of the volume.
+   * @return a {@link VolumeInfo} object encapsulating the information of the
+   * volume.
+   * @throws CBlockException when the information can not be retrieved.
+   */
+  public synchronized VolumeInfo infoVolume(String userName, String volumeName)
+      throws CBlockException {
+    if (!user2VolumeMap.containsKey(userName)
+        || !user2VolumeMap.get(userName).containsKey(volumeName)) {
+      throw new CBlockException("Getting info for non-exist volume "
+          + userName + ":" + volumeName);
+    }
+    return user2VolumeMap.get(userName).get(volumeName).getInfo();
+  }
+
+  /**
+   * Called by CBlock server to check whether the given volume can be
+   * mounted, i.e. whether it can be found in the meta data.
+   *
+   * return a {@link MountVolumeResponse} with isValid flag to indicate
+   * whether the volume can be mounted or not.
+   *
+   * @param userName the user name of the volume.
+   * @param volumeName the name of the volume
+   * @return a {@link MountVolumeResponse} object encapsulating whether the
+   * volume is valid, and if yes, the requried information for client to
+   * read/write the volume.
+   */
+  public synchronized MountVolumeResponse isVolumeValid(
+      String userName, String volumeName) {
+    if (!user2VolumeMap.containsKey(userName)
+        || !user2VolumeMap.get(userName).containsKey(volumeName)) {
+      // in the case of invalid volume, no need to set any value other than
+      // isValid flag.
+      return new MountVolumeResponse(false, null, null, 0, 0, null, null);
+    }
+    VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName);
+    return new MountVolumeResponse(true, userName,
+        volumeName, volume.getVolumeSize(), volume.getBlockSize(),
+        volume.getContainerPipelines(), volume.getPipelines());
+  }
+
+  /**
+   * Called by CBlock manager to list all volumes.
+   *
+   * @param userName the userName whose volume to be listed, if set to null,
+   *                 all volumes will be listed.
+   * @return a list of {@link VolumeDescriptor} representing all volumes
+   * requested.
+   */
+  public synchronized List<VolumeDescriptor> getAllVolume(String userName) {
+    ArrayList<VolumeDescriptor> allVolumes = new ArrayList<>();
+    if (userName == null) {
+      for (Map.Entry<String, HashMap<String, VolumeDescriptor>> entry
+          : user2VolumeMap.entrySet()) {
+        allVolumes.addAll(entry.getValue().values());
+      }
+    } else {
+      if (user2VolumeMap.containsKey(userName)) {
+        allVolumes.addAll(user2VolumeMap.get(userName).values());
+      }
+    }
+    return allVolumes;
+  }
+
+  /**
+   * Only for testing the behavior of create/delete volumes.
+   */
+  @VisibleForTesting
+  public VolumeDescriptor getVolume(String userName, String volumeName) {
+    if (!user2VolumeMap.containsKey(userName)
+        || !user2VolumeMap.get(userName).containsKey(volumeName)) {
+      return null;
+    }
+    return user2VolumeMap.get(userName).get(volumeName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java
new file mode 100644
index 0000000..4426e6d
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.storage;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java
new file mode 100644
index 0000000..beb9e32
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.util;
+
+/**
+ * A simply class that generates key mappings. (e.g. from (userName, 
volumeName)
+ * pair to a single string volumeKey.
+ */
+public final class KeyUtil {
+  public static String getVolumeKey(String userName, String volumeName) {
+    return userName + ":" + volumeName;
+  }
+
+  public static String getContainerName(String userName, String volumeName,
+      int containerID) {
+    return getVolumeKey(userName, volumeName) + "#" + containerID;
+  }
+
+  public static String getUserNameFromVolumeKey(String key) {
+    return key.split(":")[0];
+  }
+
+  public static String getVolumeFromVolumeKey(String key) {
+    return key.split(":")[1];
+  }
+
+  public static boolean isValidVolumeKey(String key) {
+    return key.contains(":");
+  }
+
+  private KeyUtil() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java
new file mode 100644
index 0000000..5b9aa0c
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.util;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto 
b/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto
new file mode 100644
index 0000000..160b254
--- /dev/null
+++ b/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+option java_package = "org.apache.hadoop.cblock.protocol.proto";
+option java_outer_classname = "CBlockClientServerProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.cblock;
+
+import "hdsl.proto";
+import "CBlockServiceProtocol.proto";
+/**
+* This message is sent from CBlock client side to CBlock server to
+* mount a volume specified by owner name and volume name.
+*
+* Right now, this is the only communication between client and server.
+* After the volume is mounted, CBlock client will talk to containers
+* by itself, nothing to do with CBlock server.
+*/
+message MountVolumeRequestProto {
+    required string userName = 1;
+    required string volumeName = 2;
+}
+
+/**
+* This message is sent from CBlock server to CBlock client as response
+* of mount a volume. It checks the whether the volume is valid to access
+* at all.(e.g. volume exist)
+*
+* And include enough information (volume size, block size, list of
+* containers for this volume) for client side to perform read/write on
+* the volume.
+*/
+message MountVolumeResponseProto {
+    required bool isValid = 1;
+    optional string userName = 2;
+    optional string volumeName = 3;
+    optional uint64 volumeSize = 4;
+    optional uint32 blockSize = 5;
+    repeated ContainerIDProto allContainerIDs = 6;
+}
+
+/**
+* This message include ID of container which can be used to locate the
+* container. Since the order of containers needs to be maintained, also
+* includes a index field to verify the correctness of the order.
+*/
+message ContainerIDProto {
+    required string containerID = 1;
+    required uint64 index = 2;
+    // making pipeline optional to be compatible with exisiting tests
+    optional hadoop.hdsl.Pipeline pipeline = 3;
+}
+
+
+message ListVolumesRequestProto {
+
+}
+
+message ListVolumesResponseProto {
+    repeated VolumeInfoProto volumeEntry = 1;
+}
+
+
+service CBlockClientServerProtocolService {
+    /**
+    * mount the volume.
+    */
+    rpc mountVolume(MountVolumeRequestProto) returns 
(MountVolumeResponseProto);
+
+    rpc listVolumes(ListVolumesRequestProto) returns(ListVolumesResponseProto);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto 
b/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto
new file mode 100644
index 0000000..36e4b59
--- /dev/null
+++ b/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.cblock.protocol.proto";
+option java_outer_classname = "CBlockServiceProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.cblock;
+
+/**
+* This message is sent to CBlock server to create a volume. Creating
+* volume requries four parameters: owner of the volume, name of the volume
+* size of volume and block size of the volume.
+*/
+message CreateVolumeRequestProto {
+    required string userName = 1;
+    required string volumeName = 2;
+    required uint64 volumeSize = 3;
+    optional uint32 blockSize = 4 [default = 4096];
+}
+
+/**
+* Empty response message.
+*/
+message CreateVolumeResponseProto {
+
+}
+
+/**
+* This message is sent to CBlock server to delete a volume. The volume
+* is specified by owner name and volume name. If force is set to
+* false, volume will be deleted only if it is empty. Otherwise delete it
+* regardless.
+*/
+message DeleteVolumeRequestProto {
+    required string userName = 1;
+    required string volumeName = 2;
+    optional bool force = 3;
+}
+
+/**
+* Empty response message.
+*/
+message DeleteVolumeResponseProto {
+
+}
+
+/**
+* This message is sent to CBlock server to request info of a volume. The
+* volume is specified by owner name and volume name.
+*/
+message InfoVolumeRequestProto {
+    required string userName = 1;
+    required string volumeName = 2;
+}
+
+/**
+* This message describes the information of a volume.
+* Currently, the info includes the volume creation parameters and a number
+* as the usage of the volume, in terms of number of bytes.
+*/
+message VolumeInfoProto {
+    required string userName = 1;
+    required string volumeName = 2;
+    required uint64 volumeSize = 3;
+    required uint64 blockSize = 4;
+    optional uint64 usage = 5;
+    // TODO : potentially volume ACL
+}
+
+/**
+* This message is sent from CBlock server as response of info volume request.
+*/
+message InfoVolumeResponseProto {
+    optional VolumeInfoProto volumeInfo = 1;
+}
+
+/**
+* This message is sent to CBlock server to list all available volume.
+*/
+message ListVolumeRequestProto {
+    optional string userName = 1;
+}
+
+/**
+* This message is sent from CBlock server as response of volume listing.
+*/
+message ListVolumeResponseProto {
+    repeated VolumeInfoProto volumeEntry = 1;
+}
+
+service CBlockServiceProtocolService {
+    /**
+    * Create a volume.
+    */
+    rpc createVolume(CreateVolumeRequestProto) 
returns(CreateVolumeResponseProto);
+
+    /**
+    * Delete a volume.
+    */
+    rpc deleteVolume(DeleteVolumeRequestProto) 
returns(DeleteVolumeResponseProto);
+
+    /**
+    * Get info of a volume.
+    */
+    rpc infoVolume(InfoVolumeRequestProto) returns(InfoVolumeResponseProto);
+
+    /**
+    * List all available volumes.
+    */
+    rpc listVolume(ListVolumeRequestProto) returns(ListVolumeResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/resources/cblock-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-cblock/server/src/main/resources/cblock-default.xml 
b/hadoop-cblock/server/src/main/resources/cblock-default.xml
new file mode 100644
index 0000000..ebf36cd
--- /dev/null
+++ b/hadoop-cblock/server/src/main/resources/cblock-default.xml
@@ -0,0 +1,347 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<!-- Do not modify this file directly.  Instead, copy entries that you -->
+<!-- wish to modify from this file into ozone-site.xml and change them -->
+<!-- there.  If ozone-site.xml does not already exist, create it.      -->
+
+<!--Tags supported are OZONE, CBLOCK, MANAGEMENT, SECURITY, PERFORMANCE,   -->
+<!--DEBUG, CLIENT, SERVER, KSM, SCM, CRITICAL, RATIS, CONTAINER, REQUIRED, -->
+<!--REST, STORAGE, PIPELINE, STANDALONE                                    -->
+
+<configuration>
+
+  <!--CBlock Settings-->
+  <property>
+    <name>dfs.cblock.block.buffer.flush.interval</name>
+    <value>60s</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Controls the frequency at this the local cache flushes the
+      blocks to the remote containers.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.block.buffer.size</name>
+    <value>512</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Size of the local cache for blocks. So cache size will be block
+      size multiplied by this number.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.core.min.pool.size</name>
+    <value>16</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      A minimum number of threads in the pool that cBlock cache will
+      use for the background I/O to remote containers.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.max.pool.size</name>
+    <value>256</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Maximum number of threads in the pool that cBlock cache will
+      use for background I/O to remote containers.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.keep.alive</name>
+    <value>60s</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      If the cblock cache has no I/O, then the threads in the cache
+      pool are kept idle for this amount of time before shutting down.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.leveldb.cache.size.mb</name>
+    <value>256</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      The amount of physical memory allocated to the local cache. The
+      SCSI driver will allocate this much RAM cache instances.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.max.retry</name>
+    <value>65536</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      If the local cache is enabled then, CBlock writes to the local
+      cache when I/O happens. Then the background I/O threads write this
+      block to the remote containers. This value controls how many times the
+      background thread should attempt to do I/O to the remote containers
+      before giving up.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.queue.size.in.kb</name>
+    <value>256</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Size of the in memory cache queue, that is flushed to local
+      disk.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.cache.thread.priority</name>
+    <value>5</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Priority of cache flusher thread, affecting the relative performance of
+      write and read. Supported values are 1, 5, 10.
+      Use 10 for high priority and 1 for low priority.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.container.size.gb</name>
+    <value>5</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The size of ozone container in the number of GBs. Note that
+      this is not setting container size for ozone. This setting is
+      instructing CBlock to manage containers at a standard size.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.disk.cache.path</name>
+    <value>${hadoop.tmp.dir}/cblockCacheDB</value>
+    <tag>CBLOCK, REQUIRED</tag>
+    <description>
+      The default path for the cblock local cache. If the cblock
+      local cache is enabled, then it must be set to a valid path. This cache
+      *should* be mapped to the fastest disk on a given machine, For example,
+      an SSD drive would be a good idea. Currently, all mounted disk on a
+      data node is mapped to a single path, so having a large number of IOPS
+      is essential.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.jscsi-address</name>
+    <value/>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The address that cblock will be bind to, should be a host:port
+      format, This setting is required for cblock server to start.
+      This address to be used by jscsi to mount volume.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.jscsi.cblock.server.address</name>
+    <value>127.0.0.1</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The address local jscsi server will use to talk to cblock manager.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.jscsi.port</name>
+    <value>9811</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The port on CBlockManager node for jSCSI to talk to.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.jscsi.rpc-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The actual address the cblock jscsi rpc server will bind to. If
+      this optional address is set, it overrides only the hostname portion of
+      dfs.cblock.jscsi-address.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.jscsi.server.address</name>
+    <value>0.0.0.0</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The address that jscsi server will be running, it is nice have one
+      local jscsi server for each client(Linux JSCSI client) that tries to
+      mount cblock.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.manager.pool.size</name>
+    <value>16</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Number of active threads that cblock manager will use for container
+      operations. The maximum number of the threads are limited to the
+      processor count * 2.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.rpc.timeout</name>
+    <value>300s</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      RPC timeout used for cblock CLI operations. When you
+      create very large disks, like 5TB, etc. The number of containers
+      allocated in the system is huge. It is will 5TB/5GB, which is 1000
+      containers. The client CLI might timeout even though the cblock manager
+      creates the specified disk. This value allows the user to wait for a
+      longer period.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.scm.ipaddress</name>
+    <value>127.0.0.1</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      IP address used by cblock to connect to SCM.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.scm.port</name>
+    <value>9860</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      Port used by cblock to connect to SCM.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.service.handler.count</name>
+    <value>10</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      Default number of handlers for CBlock service rpc.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.service.leveldb.path</name>
+    <value>${hadoop.tmp.dir}/cblock_server.dat</value>
+    <tag>CBLOCK, REQUIRED</tag>
+    <description>
+      Default path for the cblock meta data store.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.service.rpc-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>CBLOCK, MANAGEMENT</tag>
+    <description>
+      The actual address the cblock service RPC server will bind to.
+      If the optional address is set, it overrides only the hostname portion of
+      dfs.cblock.servicerpc-address.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.servicerpc-address</name>
+    <value/>
+    <tag>CBLOCK, MANAGEMENT, REQUIRED</tag>
+    <description>
+      The address that cblock will be bind to, should be a host:port
+      format, this setting is required for cblock server to start.
+      This address is used for cblock management operations like create, 
delete,
+      info and list volumes
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.short.circuit.io</name>
+    <value>false</value>
+    <tag>CBLOCK, PERFORMANCE</tag>
+    <description>
+      Enables use of the local cache in cblock. Enabling this allows
+      I/O against the local cache and background threads do actual I/O against
+      the
+      containers.
+    </description>
+  </property>
+  <property>
+    <name>dfs.cblock.trace.io</name>
+    <value>false</value>
+    <tag>CBLOCK, DEBUG</tag>
+    <description>Default flag for enabling trace io, Trace I/O logs all I/O 
with
+      hashes of
+      data. This is useful for detecting things like data corruption.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.iscsi.advertised.ip</name>
+    <value>0.0.0.0</value>
+    <tag>CBLOCK</tag>
+    <description>
+      IP address returned during the iscsi discovery.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.iscsi.advertised.port</name>
+    <value>3260</value>
+    <tag>CBLOCK</tag>
+    <description>
+      TCP port returned during the iscsi discovery.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name>
+    <value>false</value>
+    <tag>CBLOCK, KUBERNETES</tag>
+    <description>Flag to enable automatic creation of cblocks and
+      kubernetes PersitentVolumes in kubernetes environment.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.kubernetes.cblock-user</name>
+    <value>iqn.2001-04.org.apache.hadoop</value>
+    <tag>CBLOCK, KUBERNETES</tag>
+    <description>CBlock user to use for the dynamic provisioner.
+      This user will own all of the auto-created cblocks.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.kubernetes.configfile</name>
+    <value></value>
+    <tag>CBLOCK, KUBERNETES</tag>
+    <description>Location of the kubernetes configuration file
+      to access the kubernetes cluster. Not required inside a pod
+      as the default service account will be if this value is
+      empty.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.iscsi.advertised.ip</name>
+    <value></value>
+    <tag>CBLOCK, KUBERNETES</tag>
+    <description>IP where the cblock target server is available
+      from the kubernetes nodes. Usually it's a cluster ip address
+      which is defined by a deployed Service.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.iscsi.advertised.port</name>
+    <value>3260</value>
+    <tag>CBLOCK, KUBERNETES</tag>
+    <description>Port where the cblock target server is available
+      from the kubernetes nodes. Could be different from the
+      listening port if jscsi is behind a Service.
+    </description>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
new file mode 100644
index 0000000..e1eb36f
--- /dev/null
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import 
org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL;
+
+/**
+ * Tests for Local Cache Buffer Manager.
+ */
+public class TestBufferManager {
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    String path = GenericTestUtils.getTempPath(
+        TestBufferManager.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    cluster = new MiniOzoneClassicCluster.Builder(config)
+        .numDataNodes(1).setHandlerType("distributed").build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * createContainerAndGetPipeline creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException
+   */
+  private List<Pipeline> createContainerAndGetPipeline(int count)
+      throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(
+              xceiverClientManager.getType(),
+              xceiverClientManager.getFactor(), containerName, "CBLOCK");
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+      xceiverClientManager.releaseClient(client);
+    }
+    return containerPipelines;
+  }
+
+  /**
+   * This test writes some block to the cache and then shuts down the cache.
+   * The cache is then restarted to check that the
+   * correct number of blocks are read from Dirty Log
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEmptyBlockBufferHandling() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    List<Pipeline> pipelines = createContainerAndGetPipeline(10);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    // Write data to the cache
+    cache.put(1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    cache.put(2, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+
+    // Store the previous block buffer position
+    Assert.assertEquals(2, metrics.getNumBlockBufferUpdates());
+    // Simulate a shutdown by closing the cache
+    cache.close();
+    Thread.sleep(1000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
+                                metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+    Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits());
+
+    // Restart cache and check that right number of entries are read
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig,
+            xceiverClientManager, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread fllushListenerThread = new Thread(newFlusher);
+    fllushListenerThread.setDaemon(true);
+    fllushListenerThread.start();
+
+    Thread.sleep(5000);
+    Assert.assertEquals(metrics.getNumBlockBufferUpdates(),
+                                      newMetrics.getNumDirtyLogBlockRead());
+    Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
+            * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
+    // Now shutdown again, nothing should be flushed
+    newFlusher.shutdown();
+    Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
+  }
+
+  @Test
+  public void testPeriodicFlush() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 5, SECONDS);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread.sleep(8000);
+    // Ticks will be at 5s, 10s and so on, so this count should be 1
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    // Nothing pushed to cache, so nothing should be written
+    Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    // After close, another trigger should happen but still no data written
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+  }
+
+  @Test
+  public void testSingleBufferFlush() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+
+    for (int i = 0; i < 511; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    // After writing 511 block no flush should happen
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+
+
+    // After one more block it should
+    cache.put(512, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Thread.sleep(1000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE),
+                                metrics.getNumBytesDirtyLogWritten());
+  }
+
+  @Test
+  public void testMultipleBuffersFlush() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 120, SECONDS);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < 512; j++) {
+        cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8));
+      }
+      // Flush should be triggered after every 512 block write
+      Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered());
+    }
+    Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles());
+    Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes());
+    cache.close();
+    Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE),
+        metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted());
+  }
+
+  @Test
+  public void testSingleBlockFlush() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL,
+            5, SECONDS);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    cache.put(0, data.getBytes(StandardCharsets.UTF_8));
+    Thread.sleep(8000);
+    // Ticks will be at 5s, 10s and so on, so this count should be 1
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    // 1 block written to cache, which should be flushed
+    Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    // After close, another trigger should happen but no data should be written
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+  }
+
+  @Test
+  public void testRepeatedBlockWrites() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestBufferManager.class.getSimpleName()
+            + RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    Thread fllushListenerThread = new Thread(flusher);
+    fllushListenerThread.setDaemon(true);
+    fllushListenerThread.start();
+    cache.start();
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(512, metrics.getNumWriteOps());
+    Assert.assertEquals(512, metrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Thread.sleep(5000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+
+
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(1024, metrics.getNumWriteOps());
+    Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+
+    Thread.sleep(5000);
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java
new file mode 100644
index 0000000..4139ac6
--- /dev/null
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import org.apache.hadoop.conf.TestConfigurationFieldsBase;
+
+/**
+ * Tests if configuration constants documented in ozone-defaults.xml.
+ */
+public class TestCBlockConfigurationFields extends TestConfigurationFieldsBase 
{
+
+  @Override
+  public void initializeMemberVariables() {
+    xmlFilename = new String("cblock-default.xml");
+    configurationClasses =
+        new Class[] {CBlockConfigKeys.class};
+    errorIfMissingConfigProps = true;
+    errorIfMissingXmlProps = true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
new file mode 100644
index 0000000..d995ba6
--- /dev/null
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState;
+import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor;
+import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+
+/**
+ * Tests for Cblock read write functionality.
+ */
+public class TestCBlockReadWrite {
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestCBlockReadWrite.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    cluster = new MiniOzoneClassicCluster.Builder(config)
+        .numDataNodes(1)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * getContainerPipelines creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException throws Exception
+   */
+  private List<Pipeline> getContainerPipeline(int count) throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(
+              xceiverClientManager.getType(),
+              xceiverClientManager.getFactor(), containerName, "CBLOCK");
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+    }
+    return containerPipelines;
+  }
+
+  /**
+   * This test creates a cache and performs a simple write / read.
+   * The operations are done by bypassing the cache.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDirectIO() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration cConfig = new OzoneConfiguration();
+    cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    final long blockID = 0;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    String dataHash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(cConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Assert.assertFalse(cache.isShortCircuitIOEnabled());
+    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    // Please note that this read is directly from remote container
+    LogicalBlock block = cache.get(blockID);
+    Assert.assertEquals(1, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+
+    cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+    // Please note that this read is directly from remote container
+    block = cache.get(blockID + 1);
+    Assert.assertEquals(2, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(2, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    String readHash = DigestUtils.sha256Hex(block.getData().array());
+    Assert.assertEquals("File content does not match.", dataHash, readHash);
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    cache.close();
+  }
+
+  /**
+   * This test writes some block to the cache and then shuts down the cache
+   * The cache is then restarted with "short.circuit.io" disable to check
+   * that the blocks are read correctly from the container.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testContainerWrites() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestCBlockReadWrite.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 3,
+        TimeUnit.SECONDS);
+    XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+
+    int numUniqueBlocks = 4;
+    String[] data = new String[numUniqueBlocks];
+    String[] dataHash = new String[numUniqueBlocks];
+    for (int i = 0; i < numUniqueBlocks; i++) {
+      data[i] = RandomStringUtils.random(4 * KB);
+      dataHash[i] = DigestUtils.sha256Hex(data[i]);
+    }
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xcm, metrics);
+    List<Pipeline> pipelines = getContainerPipeline(10);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+    Assert.assertTrue(cache.isShortCircuitIOEnabled());
+    // Write data to the cache
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
+    }
+    // Close the cache and flush the data to the containers
+    cache.close();
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(512, metrics.getNumWriteOps());
+    Thread.sleep(3000);
+    flusher.shutdown();
+    Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
+    // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Assert.assertFalse(newCache.isShortCircuitIOEnabled());
+    // this read will be from the container, also match the hash
+    for (int i = 0; i < 512; i++) {
+      LogicalBlock block = newCache.get(i);
+      String readHash = DigestUtils.sha256Hex(block.getData().array());
+      Assert.assertEquals("File content does not match, for index:"
+          + i, dataHash[i % numUniqueBlocks], readHash);
+    }
+    Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
+    newCache.close();
+    newFlusher.shutdown();
+  }
+
+  @Test
+  public void testRetryLog() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    String path = GenericTestUtils
+        .getTempPath(TestCBlockReadWrite.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL,
+        3,
+        TimeUnit.SECONDS);
+
+    int numblocks = 10;
+    flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+
+    List<Pipeline> fakeContainerPipelines = new LinkedList<>();
+    PipelineChannel pipelineChannel = new PipelineChannel("fake",
+        LifeCycleState.OPEN, ReplicationType.STAND_ALONE, 
ReplicationFactor.ONE,
+        "fake");
+    Pipeline fakePipeline = new Pipeline("fake", pipelineChannel);
+    fakePipeline.setData(Longs.toByteArray(1));
+    fakeContainerPipelines.add(fakePipeline);
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(fakeContainerPipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+
+    for (int i = 0; i < numblocks; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(numblocks, metrics.getNumWriteOps());
+    Thread.sleep(3000);
+
+    // all the writes to the container will fail because of fake pipelines
+    Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
+    Assert.assertTrue(
+        metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
+    cache.close();
+    flusher.shutdown();
+
+    // restart cache with correct pipelines, now blocks should be uploaded
+    // correctly
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig,
+            xceiverClientManager, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread newFlushListenerThread = new Thread(newFlusher);
+    newFlushListenerThread.setDaemon(true);
+    newFlushListenerThread.start();
+    Thread.sleep(3000);
+    Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
+    Assert.assertEquals(0, 
newMetrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to