http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-cblock/server/pom.xml b/hadoop-cblock/server/pom.xml
new file mode 100644
index 0000000..17af85d
--- /dev/null
+++ b/hadoop-cblock/server/pom.xml
@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-cblock</artifactId>
+    <version>3.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>hadoop-cblock-server</artifactId>
+  <version>3.2.0-SNAPSHOT</version>
+  <description>Apache Hadoop CBlock Server</description>
+  <name>Apache Hadoop CBlock Server</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <hadoop.component>cblock</hadoop.component>
+    <is.hadoop.component>true</is.hadoop.component>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdsl-server-framework</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdsl-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdsl-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-integration-test</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jscsi</groupId>
+      <artifactId>target</artifactId>
+      <version>2.6.0</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java</artifactId>
+      <version>1.0.0-beta1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.swagger</groupId>
+          <artifactId>swagger-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.github.stefanbirkner</groupId>
+          <artifactId>system-rules</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+  </dependencies>
+
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            
<exclude>src/test/resources/dynamicprovisioner/expected1-pv.json</exclude>
+            
<exclude>src/test/resources/dynamicprovisioner/input1-pvc.json</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>
+                  
${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto
+                </param>
+                <param>
+                  
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/
+                </param>
+                <param>
+                  
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/
+                </param>
+                <param>
+                  ${basedir}/../../hadoop-hdsl/common/src/main/proto/
+                </param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>CBlockClientServerProtocol.proto</include>
+                  <include>CBlockServiceProtocol.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
new file mode 100644
index 0000000..ec5d7d2
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import static java.lang.Thread.NORM_PRIORITY;
+
+/**
+ * This class contains constants for configuration keys used in CBlock.
+ */
+public final class CBlockConfigKeys {
+
+  public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
+      "dfs.cblock.servicerpc-address";
+  public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT =
+      9810;
+  public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT =
+      "0.0.0.0";
+
+  public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY =
+      "dfs.cblock.jscsi-address";
+
+  //The port on CBlockManager node for jSCSI to ask
+  public static final String DFS_CBLOCK_JSCSI_PORT_KEY =
+      "dfs.cblock.jscsi.port";
+  public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT =
+      9811;
+
+  public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY =
+      "dfs.cblock.service.rpc-bind-host";
+  public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY =
+      "dfs.cblock.jscsi.rpc-bind-host";
+
+  // default block size is 4KB
+  public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT =
+      4096;
+
+  public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY =
+      "dfs.cblock.service.handler.count";
+  public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10;
+
+  public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY =
+      "dfs.cblock.service.leveldb.path";
+  //TODO : find a better place
+  public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT =
+      "/tmp/cblock_levelDB.dat";
+
+
+  public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY =
+      "dfs.cblock.disk.cache.path";
+  public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT =
+      "/tmp/cblockCacheDB";
+  /**
+   * Setting this flag to true makes the block layer compute a sha256 hash of
+   * the data and log that information along with block ID. This is very
+   * useful for doing trace based simulation of various workloads. Since it is
+   * computing a hash for each block this could be expensive, hence default
+   * is false.
+   */
+  public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io";
+  public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false;
+
+  public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO =
+      "dfs.cblock.short.circuit.io";
+  public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT =
+      false;
+
+  /**
+   * Cache size in 1000s of entries. 256 indicates 256 * 1024.
+   */
+  public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB =
+      "dfs.cblock.cache.queue.size.in.kb";
+  public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256;
+
+  /**
+   *  Minimum Number of threads that cache pool will use for background I/O.
+   */
+  public static final String DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE =
+      "dfs.cblock.cache.core.min.pool.size";
+  public static final int DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT = 16;
+
+  /**
+   *  Maximum Number of threads that cache pool will use for background I/O.
+   */
+
+  public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE =
+      "dfs.cblock.cache.max.pool.size";
+  public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256;
+
+  /**
+   * Number of seconds to keep the Thread alive when it is idle.
+   */
+  public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE =
+      "dfs.cblock.cache.keep.alive";
+  public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT = "60s";
+
+  /**
+   * Priority of cache flusher thread, affecting the relative performance of
+   * write and read.
+   */
+  public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY =
+      "dfs.cblock.cache.thread.priority";
+  public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT =
+      NORM_PRIORITY;
+
+  /**
+   * Block Buffer size in terms of blockID entries, 512 means 512 blockIDs.
+   */
+  public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE =
+      "dfs.cblock.cache.block.buffer.size";
+  public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
+
+  public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL =
+      "dfs.cblock.block.buffer.flush.interval";
+  public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT =
+      "60s";
+
+  // jscsi server settings
+  public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
+      "dfs.cblock.jscsi.server.address";
+  public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT =
+      "0.0.0.0";
+  public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY =
+      "dfs.cblock.jscsi.cblock.server.address";
+  public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT =
+      "127.0.0.1";
+
+  // to what address cblock server should talk to scm?
+  public static final String DFS_CBLOCK_SCM_IPADDRESS_KEY =
+      "dfs.cblock.scm.ipaddress";
+  public static final String DFS_CBLOCK_SCM_IPADDRESS_DEFAULT =
+      "127.0.0.1";
+  public static final String DFS_CBLOCK_SCM_PORT_KEY =
+      "dfs.cblock.scm.port";
+  public static final int DFS_CBLOCK_SCM_PORT_DEFAULT = 9860;
+
+  public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY =
+      "dfs.cblock.container.size.gb";
+  public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT =
+      5;
+
+  // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB.
+  public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY =
+      "dfs.cblock.cache.leveldb.cache.size.mb";
+  public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
+
+  /**
+   * Cache does an best case attempt to write a block to a container.
+   * At some point of time, we will need to handle the case where we did try
+   * 64K times and is till not able to write to the container.
+   *
+   * TODO: We will need cBlock Server to allow us to do a remapping of the
+   * block location in case of failures, at that point we should reduce the
+   * retry count to a more normal number. This is approximately 18 hours of
+   * retry.
+   */
+  public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY =
+      "dfs.cblock.cache.max.retry";
+  public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
+      64 * 1024;
+
+  /**
+   * Cblock CLI configs.
+   */
+  public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
+      "dfs.cblock.manager.pool.size";
+  public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;
+
+  /**
+   * currently the largest supported volume is about 8TB, which might take
+   * > 20 seconds to finish creating containers. thus set timeout to 30 sec.
+   */
+  public static final String DFS_CBLOCK_RPC_TIMEOUT =
+      "dfs.cblock.rpc.timeout";
+  public static final String DFS_CBLOCK_RPC_TIMEOUT_DEFAULT = "300s";
+
+  public static final String DFS_CBLOCK_ISCSI_ADVERTISED_IP =
+      "dfs.cblock.iscsi.advertised.ip";
+
+  public static final String DFS_CBLOCK_ISCSI_ADVERTISED_PORT =
+      "dfs.cblock.iscsi.advertised.port";
+
+  public static final int DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT = 3260;
+
+
+  public static final String
+      DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED
+        = "dfs.cblock.kubernetes.dynamic-provisioner.enabled";
+
+  public static final boolean
+      DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT = false;
+
+  public static final String
+      DFS_CBLOCK_KUBERNETES_CBLOCK_USER =
+         "dfs.cblock.kubernetes.cblock-user";
+
+  public static final String
+      DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT =
+         "iqn.2001-04.org.apache.hadoop";
+
+  public static final String
+      DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY =
+         "dfs.cblock.kubernetes.configfile";
+
+  private CBlockConfigKeys() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
new file mode 100644
index 0000000..788d01e
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.cblock.kubernetes.DynamicProvisioner;
+import org.apache.hadoop.cblock.meta.VolumeDescriptor;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
+import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
+import org.apache.hadoop.cblock.proto.MountVolumeResponse;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos;
+import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
+import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
+import org.apache.hadoop.cblock.protocolPB
+    .CBlockClientServerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
+import org.apache.hadoop.cblock.protocolPB
+    .CBlockServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.client.ContainerOperationClient;
+import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.cblock.storage.StorageManager;
+import org.apache.hadoop.cblock.util.KeyUtil;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.utils.LevelDBStore;
+
+import static org.apache.hadoop.cblock.CblockUtils.getCblockServerRpcAddr;
+import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr;
+import static org.apache.hadoop.ozone.web.util.ServerUtils
+    .updateRPCListenAddress;
+import org.iq80.leveldb.DBIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_IPADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_PORT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_PORT_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT;
+
+
+/**
+ * The main entry point of CBlock operations, ALL the CBlock operations
+ * will go through this class. But NOTE that:
+ *
+ * volume operations (create/
+ * delete/info) are:
+ *    client -> CBlockManager -> StorageManager -> CBlock client
+ *
+ * IO operations (put/get block) are;
+ *    client -> CBlock client -> container
+ *
+ */
+public class CBlockManager implements CBlockServiceProtocol,
+    CBlockClientProtocol {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CBlockManager.class);
+
+  private final RPC.Server cblockService;
+  private final RPC.Server cblockServer;
+
+  private final StorageManager storageManager;
+
+  private final LevelDBStore levelDBStore;
+  private final String dbPath;
+
+  private final DynamicProvisioner kubernetesDynamicProvisioner;
+
+  private Charset encoding = Charset.forName("UTF-8");
+
+  public CBlockManager(OzoneConfiguration conf,
+      ScmClient storageClient) throws IOException {
+    // Fix the cBlockManagerId generattion code here. Should support
+    // cBlockManager --init command which will generate a cBlockManagerId and
+    // persist it locally.
+    storageManager =
+        new StorageManager(storageClient, conf, "CBLOCK");
+
+    dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY,
+        DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT);
+    levelDBStore = new LevelDBStore(new File(dbPath), true);
+    LOG.info("Try to load exising volume information");
+    readFromPersistentStore();
+
+    RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class,
+        ProtobufRpcEngine.class);
+    RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    // start service for client command-to-cblock server service
+    InetSocketAddress serviceRpcAddr =
+        getCblockServiceRpcAddr(conf);
+    BlockingService cblockProto =
+        CBlockServiceProtocolProtos
+            .CBlockServiceProtocolService
+            .newReflectiveBlockingService(
+                new CBlockServiceProtocolServerSideTranslatorPB(this)
+            );
+    cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class,
+        cblockProto, serviceRpcAddr,
+        DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
+        DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
+        DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
+    InetSocketAddress cblockServiceRpcAddress =
+        updateRPCListenAddress(conf,
+            DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService);
+    LOG.info("CBlock manager listening for client commands on: {}",
+        cblockServiceRpcAddress);
+    // now start service for cblock client-to-cblock server communication
+
+    InetSocketAddress serverRpcAddr =
+        getCblockServerRpcAddr(conf);
+    BlockingService serverProto =
+        CBlockClientServerProtocolProtos
+            .CBlockClientServerProtocolService
+            .newReflectiveBlockingService(
+                new CBlockClientServerProtocolServerSideTranslatorPB(this)
+            );
+    cblockServer = startRpcServer(
+        conf, CBlockClientServerProtocolPB.class,
+        serverProto, serverRpcAddr,
+        DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
+        DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
+        DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
+    InetSocketAddress cblockServerRpcAddress =
+        updateRPCListenAddress(conf,
+            DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
+    LOG.info("CBlock server listening for client commands on: {}",
+        cblockServerRpcAddress);
+
+    if (conf.getBoolean(DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED,
+        DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT)) {
+
+      kubernetesDynamicProvisioner =
+          new DynamicProvisioner(conf, storageManager);
+      kubernetesDynamicProvisioner.init();
+
+    } else {
+      kubernetesDynamicProvisioner = null;
+    }
+  }
+
+  public void start() {
+    cblockService.start();
+    cblockServer.start();
+    if (kubernetesDynamicProvisioner != null) {
+      kubernetesDynamicProvisioner.start();
+    }
+    LOG.info("CBlock manager started!");
+  }
+
+  public void stop() {
+    cblockService.stop();
+    cblockServer.stop();
+    if (kubernetesDynamicProvisioner != null) {
+      kubernetesDynamicProvisioner.stop();
+    }
+  }
+
+  public void join() {
+    try {
+      cblockService.join();
+      cblockServer.join();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted during join");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param addr configured address of RPC server
+   * @param bindHostKey configuration key for setting explicit bind host.  If
+   *     the property is not configured, then the bind host is taken from addr.
+   * @param handlerCountKey configuration key for RPC server handler count
+   * @param handlerCountDefault default RPC server handler count if 
unconfigured
+   * @return RPC server, or null if addr is null
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private static RPC.Server startRpcServer(OzoneConfiguration conf,
+      Class<?> protocol, BlockingService instance,
+      InetSocketAddress addr, String bindHostKey,
+      String handlerCountKey, int handlerCountDefault) throws IOException {
+    if (addr == null) {
+      return null;
+    }
+    String bindHost = conf.getTrimmed(bindHostKey);
+    if (bindHost == null || bindHost.isEmpty()) {
+      bindHost = addr.getHostName();
+    }
+    int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
+    RPC.Server rpcServer = new RPC.Builder(conf)
+        .setProtocol(protocol)
+        .setInstance(instance)
+        .setBindAddress(bindHost)
+        .setPort(addr.getPort())
+        .setNumHandlers(numHandlers)
+        .setVerbose(false)
+        .setSecretManager(null)
+        .build();
+    return rpcServer;
+  }
+
+  @Override
+  public synchronized MountVolumeResponse mountVolume(
+      String userName, String volumeName) throws IOException {
+    return storageManager.isVolumeValid(userName, volumeName);
+  }
+
+  @Override
+  public List<VolumeInfo> listVolumes() throws IOException {
+    return listVolume(null);
+  }
+
+  @Override
+  public synchronized void createVolume(String userName, String volumeName,
+      long volumeSize, int blockSize) throws IOException {
+    LOG.info("Create volume received: userName: {} volumeName: {} " +
+            "volumeSize: {} blockSize: {}", userName, volumeName,
+        volumeSize, blockSize);
+    // It is important to create in-memory representation of the
+    // volume first, then writes to persistent storage (levelDB)
+    // such that it is guaranteed that when there is an entry in
+    // levelDB, the volume is allocated. (more like a UNDO log fashion)
+    // TODO: what if creation failed? we allocated containers but lost
+    // the reference to the volume and all it's containers. How to release
+    // the containers?
+    storageManager.createVolume(userName, volumeName, volumeSize, blockSize);
+    VolumeDescriptor volume = storageManager.getVolume(userName, volumeName);
+    if (volume == null) {
+      throw new IOException("Volume creation failed!");
+    }
+    String volumeKey = KeyUtil.getVolumeKey(userName, volumeName);
+    writeToPersistentStore(volumeKey.getBytes(encoding),
+        volume.toProtobuf().toByteArray());
+  }
+
+  @Override
+  public synchronized void deleteVolume(String userName,
+      String volumeName, boolean force) throws IOException {
+    LOG.info("Delete volume received: volume: {} {} ", volumeName, force);
+    storageManager.deleteVolume(userName, volumeName, force);
+    // being here means volume is successfully deleted now
+    String volumeKey = KeyUtil.getVolumeKey(userName, volumeName);
+    removeFromPersistentStore(volumeKey.getBytes(encoding));
+  }
+
+  // No need to synchronize on the following three methods, since write and
+  // remove's caller are synchronized. read's caller is the constructor and
+  // no other method call can happen at that time.
+  @VisibleForTesting
+  public void writeToPersistentStore(byte[] key, byte[] value) {
+    levelDBStore.put(key, value);
+  }
+
+  @VisibleForTesting
+  public void removeFromPersistentStore(byte[] key) {
+    levelDBStore.delete(key);
+  }
+
+  public void readFromPersistentStore() throws IOException {
+    try (DBIterator iter = levelDBStore.getIterator()) {
+      iter.seekToFirst();
+      while (iter.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = iter.next();
+        String volumeKey = new String(entry.getKey(), encoding);
+        try {
+          VolumeDescriptor volumeDescriptor =
+              VolumeDescriptor.fromProtobuf(entry.getValue());
+          storageManager.addVolume(volumeDescriptor);
+        } catch (IOException e) {
+          LOG.error("Loading volume " + volumeKey + " error " + e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public synchronized VolumeInfo infoVolume(String userName, String volumeName
+  ) throws IOException {
+    LOG.info("Info volume received: volume: {}", volumeName);
+    return storageManager.infoVolume(userName, volumeName);
+  }
+
+  @VisibleForTesting
+  public synchronized List<VolumeDescriptor> getAllVolumes() {
+    return storageManager.getAllVolume(null);
+  }
+
+  public synchronized List<VolumeDescriptor> getAllVolumes(String userName) {
+    return storageManager.getAllVolume(userName);
+  }
+
+  public synchronized void close() {
+    try {
+      levelDBStore.close();
+    } catch (IOException e) {
+      LOG.error("Error when closing levelDB " + e);
+    }
+  }
+
+  public synchronized void clean() {
+    try {
+      levelDBStore.close();
+      levelDBStore.destroy();
+    } catch (IOException e) {
+      LOG.error("Error when deleting levelDB " + e);
+    }
+  }
+
+  @Override
+  public synchronized List<VolumeInfo> listVolume(String userName)
+      throws IOException {
+    ArrayList<VolumeInfo> response = new ArrayList<>();
+    List<VolumeDescriptor> allVolumes =
+        storageManager.getAllVolume(userName);
+    for (VolumeDescriptor volume : allVolumes) {
+      VolumeInfo info =
+          new VolumeInfo(volume.getUserName(), volume.getVolumeName(),
+              volume.getVolumeSize(), volume.getBlockSize());
+      response.add(info);
+    }
+    return response;
+  }
+
+  public static void main(String[] args) throws Exception {
+    long version = RPC.getProtocolVersion(
+        StorageContainerLocationProtocolPB.class);
+    CblockUtils.activateConfigs();
+    OzoneConfiguration ozoneConf = new OzoneConfiguration();
+    String scmAddress = ozoneConf.get(DFS_CBLOCK_SCM_IPADDRESS_KEY,
+        DFS_CBLOCK_SCM_IPADDRESS_DEFAULT);
+    int scmPort = ozoneConf.getInt(DFS_CBLOCK_SCM_PORT_KEY,
+        DFS_CBLOCK_SCM_PORT_DEFAULT);
+    int containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY,
+        DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT);
+    ContainerOperationClient.setContainerSizeB(containerSizeGB* 
OzoneConsts.GB);
+    InetSocketAddress address = new InetSocketAddress(scmAddress, scmPort);
+
+    ozoneConf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
+        OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
+    LOG.info(
+        "Creating StorageContainerLocationProtocol RPC client with address {}",
+        address);
+    RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    StorageContainerLocationProtocolClientSideTranslatorPB client =
+        new StorageContainerLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
+                address, UserGroupInformation.getCurrentUser(), ozoneConf,
+                NetUtils.getDefaultSocketFactory(ozoneConf),
+                Client.getRpcTimeout(ozoneConf)));
+    ScmClient storageClient = new ContainerOperationClient(
+        client, new XceiverClientManager(ozoneConf));
+    CBlockManager cbm = new CBlockManager(ozoneConf, storageClient);
+    cbm.start();
+    cbm.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java
new file mode 100644
index 0000000..99ffde0
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.cblock;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.common.base.Optional;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSI_PORT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
+import static org.apache.hadoop.hdsl.HdslUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.hdsl.HdslUtils.getPortNumberFromConfigKeys;
+
+/**
+ * Generic stateless utility functions for CBlock components.
+ */
+public class CblockUtils {
+
+  private CblockUtils() {
+  }
+
+  /**
+   * Retrieve the socket address that is used by CBlock Service.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Service endpoint.
+   */
+  public static InetSocketAddress getCblockServiceRpcAddr(Configuration conf) {
+    final Optional<String> host =
+        getHostNameFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port =
+        getPortNumberFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port
+            .or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that is used by CBlock Server.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Server endpoint.
+   */
+  public static InetSocketAddress getCblockServerRpcAddr(Configuration conf) {
+    final Optional<String> host =
+        getHostNameFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port =
+        getPortNumberFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port
+            .or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
+  }
+
+  /**
+   * Parse size with size prefix string and return in bytes.
+   *
+   */
+  public static long parseSize(String volumeSizeArgs) throws IOException {
+    long multiplier = 1;
+
+    Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)");
+    Matcher m = p.matcher(volumeSizeArgs);
+
+    if (!m.find()) {
+      throw new IOException("Invalid volume size args " + volumeSizeArgs);
+    }
+
+    int size = Integer.parseInt(m.group(1));
+    String s = m.group(2);
+
+    if (s.equalsIgnoreCase("MB") ||
+        s.equalsIgnoreCase("Mi")) {
+      multiplier = 1024L * 1024;
+    } else if (s.equalsIgnoreCase("GB") ||
+        s.equalsIgnoreCase("Gi")) {
+      multiplier = 1024L * 1024 * 1024;
+    } else if (s.equalsIgnoreCase("TB") ||
+        s.equalsIgnoreCase("Ti")) {
+      multiplier = 1024L * 1024 * 1024 * 1024;
+    } else {
+      throw new IOException("Invalid volume size args " + volumeSizeArgs);
+    }
+    return size * multiplier;
+  }
+
+  public static void activateConfigs(){
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+    Configuration.addDefaultResource("ozone-default.xml");
+    Configuration.addDefaultResource("ozone-site.xml");
+    Configuration.addDefaultResource("cblock-default.xml");
+    Configuration.addDefaultResource("cblock-site.xml");
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..ec38c9c
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
+import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
+import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The client side implement of CBlockServiceProtocol.
+ */
+@InterfaceAudience.Private
+public final class CBlockServiceProtocolClientSideTranslatorPB
+    implements CBlockServiceProtocol, ProtocolTranslator, Closeable {
+
+  private final CBlockServiceProtocolPB rpcProxy;
+
+  public CBlockServiceProtocolClientSideTranslatorPB(
+      CBlockServiceProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public void createVolume(String userName, String volumeName,
+      long volumeSize, int blockSize) throws IOException {
+    CBlockServiceProtocolProtos.CreateVolumeRequestProto.Builder req =
+        CBlockServiceProtocolProtos.CreateVolumeRequestProto.newBuilder();
+    req.setUserName(userName);
+    req.setVolumeName(volumeName);
+    req.setVolumeSize(volumeSize);
+    req.setBlockSize(blockSize);
+    try {
+      rpcProxy.createVolume(null, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void deleteVolume(String userName, String volumeName, boolean force)
+      throws IOException {
+    CBlockServiceProtocolProtos.DeleteVolumeRequestProto.Builder req =
+        CBlockServiceProtocolProtos.DeleteVolumeRequestProto.newBuilder();
+    req.setUserName(userName);
+    req.setVolumeName(volumeName);
+    req.setForce(force);
+    try {
+      rpcProxy.deleteVolume(null, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  @Override
+  public VolumeInfo infoVolume(String userName, String volumeName)
+      throws IOException {
+    CBlockServiceProtocolProtos.InfoVolumeRequestProto.Builder req =
+        CBlockServiceProtocolProtos.InfoVolumeRequestProto.newBuilder();
+    req.setUserName(userName);
+    req.setVolumeName(volumeName);
+    try {
+      CBlockServiceProtocolProtos.InfoVolumeResponseProto resp =
+          rpcProxy.infoVolume(null, req.build());
+      return new VolumeInfo(resp.getVolumeInfo().getUserName(),
+          resp.getVolumeInfo().getVolumeName(),
+          resp.getVolumeInfo().getVolumeSize(),
+          resp.getVolumeInfo().getBlockSize(),
+          resp.getVolumeInfo().getUsage());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public List<VolumeInfo> listVolume(String userName) throws IOException {
+    CBlockServiceProtocolProtos.ListVolumeRequestProto.Builder req =
+        CBlockServiceProtocolProtos.ListVolumeRequestProto.newBuilder();
+    if (userName != null) {
+      req.setUserName(userName);
+    }
+    try {
+      CBlockServiceProtocolProtos.ListVolumeResponseProto resp =
+          rpcProxy.listVolume(null, req.build());
+      List<VolumeInfo> respList = new ArrayList<>();
+      for (CBlockServiceProtocolProtos.VolumeInfoProto entry :
+          resp.getVolumeEntryList()) {
+        VolumeInfo volumeInfo = new VolumeInfo(
+            entry.getUserName(), entry.getVolumeName(), entry.getVolumeSize(),
+            entry.getBlockSize());
+        respList.add(volumeInfo);
+      }
+      return respList;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    } catch (Exception e) {
+      throw new IOException("got" + e.getCause() + " " + e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
new file mode 100644
index 0000000..4a3878a
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.cblock.CBlockConfigKeys;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr;
+
+/**
+ * Implementation of client used by CBlock command line tool.
+ */
+public class CBlockVolumeClient {
+  private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
+
+  public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
+    this(conf, null);
+  }
+
+  public CBlockVolumeClient(OzoneConfiguration conf,
+      InetSocketAddress serverAddress) throws IOException {
+    InetSocketAddress address = serverAddress != null ? serverAddress :
+        getCblockServiceRpcAddr(conf);
+    long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
+    int rpcTimeout = Math.toIntExact(
+        conf.getTimeDuration(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT,
+            CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
+        RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
+            address, UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
+                .retryUpToMaximumCountWithFixedSleep(
+                    300, 1, TimeUnit.SECONDS)).getProxy());
+  }
+
+  public void createVolume(String userName, String volumeName,
+      long volumeSize, int blockSize) throws IOException {
+    cblockClient.createVolume(userName, volumeName,
+        volumeSize, blockSize);
+  }
+
+  public void deleteVolume(String userName, String volumeName, boolean force)
+      throws IOException {
+    cblockClient.deleteVolume(userName, volumeName, force);
+  }
+
+  public VolumeInfo infoVolume(String userName, String volumeName)
+      throws IOException {
+    return cblockClient.infoVolume(userName, volumeName);
+  }
+
+  public List<VolumeInfo> listVolume(String userName)
+      throws IOException {
+    return cblockClient.listVolume(userName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java
new file mode 100644
index 0000000..761b71e
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.client;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java
new file mode 100644
index 0000000..8f6b82b
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.exception;
+
+import java.io.IOException;
+
+/**
+ * The exception class used in CBlock.
+ */
+public class CBlockException extends IOException {
+  public CBlockException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java
new file mode 100644
index 0000000..268b8cb
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.exception;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
new file mode 100644
index 0000000..04fe3a4
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+
+/**
+ * The blockWriter task.
+ */
+public class BlockWriterTask implements Runnable {
+  private final LogicalBlock block;
+  private int tryCount;
+  private final ContainerCacheFlusher flusher;
+  private final String dbPath;
+  private final String fileName;
+  private final int maxRetryCount;
+
+  /**
+   * Constructs a BlockWriterTask.
+   *
+   * @param block - Block Information.
+   * @param flusher - ContainerCacheFlusher.
+   */
+  public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher,
+      String dbPath, int tryCount, String fileName, int maxRetryCount) {
+    this.block = block;
+    this.flusher = flusher;
+    this.dbPath = dbPath;
+    this.tryCount = tryCount;
+    this.fileName = fileName;
+    this.maxRetryCount = maxRetryCount;
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    String containerName = null;
+    XceiverClientSpi client = null;
+    LevelDBStore levelDBStore = null;
+    String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
+    flusher.getLOG().debug(
+        "Writing block to remote. block ID: {}", block.getBlockID());
+    try {
+      incTryCount();
+      Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID());
+      client = flusher.getXceiverClientManager().acquireClient(pipeline);
+      containerName = pipeline.getContainerName();
+      byte[] keybuf = Longs.toByteArray(block.getBlockID());
+      byte[] data;
+      long startTime = Time.monotonicNow();
+      levelDBStore = flusher.getCacheDB(this.dbPath);
+      data = levelDBStore.get(keybuf);
+      Preconditions.checkNotNull(data);
+      long endTime = Time.monotonicNow();
+      Preconditions.checkState(data.length > 0, "Block data is zero length");
+      startTime = Time.monotonicNow();
+      ContainerProtocolCalls.writeSmallFile(client, containerName,
+          Long.toString(block.getBlockID()), data, traceID);
+      endTime = Time.monotonicNow();
+      flusher.getTargetMetrics().updateContainerWriteLatency(
+          endTime - startTime);
+      flusher.getLOG().debug("Time taken for Write Small File : {} ms",
+          endTime - startTime);
+
+      flusher.incrementRemoteIO();
+
+    } catch (Exception ex) {
+      flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
+              "to write this block {} times to the container {}.Trace ID:{}",
+          block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
+      writeRetryBlock(block);
+      if (ex instanceof IOException) {
+        flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
+      } else {
+        flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
+      }
+      if (this.getTryCount() >= maxRetryCount) {
+        flusher.getTargetMetrics().incNumWriteMaxRetryBlocks();
+      }
+    } finally {
+      flusher.incFinishCount(fileName);
+      if (levelDBStore != null) {
+        flusher.releaseCacheDB(dbPath);
+      }
+      if(client != null) {
+        flusher.getXceiverClientManager().releaseClient(client);
+      }
+    }
+  }
+
+
+  private void writeRetryBlock(LogicalBlock currentBlock) {
+    boolean append = false;
+    String retryFileName =
+        String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX,
+            currentBlock.getBlockID(), Time.monotonicNow(), tryCount);
+    File logDir = new File(this.dbPath);
+    if (!logDir.exists() && !logDir.mkdirs()) {
+      flusher.getLOG().error(
+          "Unable to create the log directory, Critical error cannot 
continue");
+      return;
+    }
+    String log = Paths.get(this.dbPath, retryFileName).toString();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
+    buffer.putLong(currentBlock.getBlockID());
+    buffer.flip();
+    try {
+      FileChannel channel = new FileOutputStream(log, append).getChannel();
+      channel.write(buffer);
+      channel.close();
+      flusher.processDirtyBlocks(this.dbPath, retryFileName);
+    } catch (IOException e) {
+      flusher.getTargetMetrics().incNumFailedRetryLogFileWrites();
+      flusher.getLOG().error("Unable to write the retry block. Block ID: {}",
+          currentBlock.getBlockID(), e);
+    }
+  }
+
+  /**
+   * Increments the try count. This is done each time we try this block
+   * write to the container.
+   */
+  private void incTryCount() {
+    tryCount++;
+  }
+
+  /**
+   * Get the retry count.
+   *
+   * @return int
+   */
+  public int getTryCount() {
+    return tryCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..84b68e3
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.cblock.exception.CBlockException;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
+import org.apache.hadoop.cblock.proto.MountVolumeResponse;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ContainerIDProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ListVolumesRequestProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ListVolumesResponseProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.MountVolumeRequestProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.MountVolumeResponseProto;
+import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos
+    .VolumeInfoProto;
+import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * The client side of CBlockClientProtocol.
+ *
+ * CBlockClientProtocol is the protocol used between cblock client side
+ * and cblock manager (cblock client side is just the node where jscsi daemon
+ * process runs. a machines talks to jscsi daemon for mounting a volume).
+ *
+ * Right now, the only communication carried by this protocol is for client 
side
+ * to request mounting a volume.
+ */
+public class CBlockClientProtocolClientSideTranslatorPB
+    implements CBlockClientProtocol, ProtocolTranslator, Closeable {
+
+  private final CBlockClientServerProtocolPB rpcProxy;
+
+  public CBlockClientProtocolClientSideTranslatorPB(
+      CBlockClientServerProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  @Override
+  public MountVolumeResponse mountVolume(
+      String userName, String volumeName) throws IOException {
+    MountVolumeRequestProto.Builder
+        request
+        = MountVolumeRequestProto
+        .newBuilder();
+    request.setUserName(userName);
+    request.setVolumeName(volumeName);
+    try {
+      MountVolumeResponseProto resp
+          = rpcProxy.mountVolume(null, request.build());
+      if (!resp.getIsValid()) {
+        throw new CBlockException(
+            "Not a valid volume:" + userName + ":" + volumeName);
+      }
+      List<Pipeline> containerIDs = new ArrayList<>();
+      HashMap<String, Pipeline> containerPipelines = new HashMap<>();
+      if (resp.getAllContainerIDsList().size() == 0) {
+        throw new CBlockException("Mount volume request returned no 
container");
+      }
+      for (ContainerIDProto containerID :
+          resp.getAllContainerIDsList()) {
+        if (containerID.hasPipeline()) {
+          // it should always have a pipeline only except for tests.
+          Pipeline p = Pipeline.getFromProtoBuf(containerID.getPipeline());
+          p.setData(Longs.toByteArray(containerID.getIndex()));
+          containerIDs.add(p);
+          containerPipelines.put(containerID.getContainerID(), p);
+        } else {
+          throw new CBlockException("ContainerID does not have pipeline!");
+        }
+      }
+      return new MountVolumeResponse(
+          resp.getIsValid(),
+          resp.getUserName(),
+          resp.getVolumeName(),
+          resp.getVolumeSize(),
+          resp.getBlockSize(),
+          containerIDs,
+          containerPipelines);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public List<VolumeInfo> listVolumes() throws IOException {
+    try {
+      List<VolumeInfo> result = new ArrayList<>();
+      ListVolumesResponseProto
+          listVolumesResponseProto = this.rpcProxy.listVolumes(null,
+          ListVolumesRequestProto.newBuilder()
+              .build());
+      for (VolumeInfoProto volumeInfoProto :
+          listVolumesResponseProto
+          .getVolumeEntryList()) {
+        result.add(new VolumeInfo(volumeInfoProto.getUserName(),
+            volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(),
+            volumeInfoProto.getBlockSize()));
+      }
+      return result;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
new file mode 100644
index 0000000..2f35668
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.jscsi.target.storage.IStorageModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO_DEFAULT;
+
+/**
+ * The SCSI Target class for CBlockSCSIServer.
+ */
+final public class CBlockIStorageImpl implements IStorageModule {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(CBlockIStorageImpl.class);
+  private static final Logger TRACER =
+      LoggerFactory.getLogger("TraceIO");
+
+  private CacheModule cache;
+  private final long volumeSize;
+  private final int blockSize;
+  private final String userName;
+  private final String volumeName;
+  private final boolean traceEnabled;
+  private final Configuration conf;
+  private final ContainerCacheFlusher flusher;
+  private List<Pipeline> fullContainerList;
+
+  /**
+   * private: constructs a SCSI Target.
+   *
+   * @param config - config
+   * @param userName - Username
+   * @param volumeName - Name of the volume
+   * @param volumeSize - Size of the volume
+   * @param blockSize - Size of the block
+   * @param fullContainerList - Ordered list of containers that make up this
+   * volume.
+   * @param flusher - flusher which is used to flush data from
+   *                  level db cache to containers
+   * @throws IOException - Throws IOException.
+   */
+  private CBlockIStorageImpl(Configuration config, String userName,
+      String volumeName, long volumeSize, int blockSize,
+      List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) {
+    this.conf = config;
+    this.userName = userName;
+    this.volumeName = volumeName;
+    this.volumeSize = volumeSize;
+    this.blockSize = blockSize;
+    this.fullContainerList = new ArrayList<>(fullContainerList);
+    this.flusher = flusher;
+    this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
+        DFS_CBLOCK_TRACE_IO_DEFAULT);
+  }
+
+  /**
+   * private: initialize the cache.
+   *
+   * @param xceiverClientManager - client manager that is used for creating new
+   * connections to containers.
+   * @param metrics  - target metrics to maintain metrics for target server
+   * @throws IOException - Throws IOException.
+   */
+  private void initCache(XceiverClientManager xceiverClientManager,
+      CBlockTargetMetrics metrics) throws IOException {
+    this.cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(conf)
+        .setVolumeName(this.volumeName)
+        .setUserName(this.userName)
+        .setPipelines(this.fullContainerList)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(blockSize)
+        .setVolumeSize(volumeSize)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    this.cache.start();
+  }
+
+  /**
+   * Gets a new builder for CBlockStorageImpl.
+   *
+   * @return builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Get Cache.
+   *
+   * @return - Cache
+   */
+  public CacheModule getCache() {
+    return cache;
+  }
+
+  /**
+   * Returns block size of this volume.
+   *
+   * @return int size of block for this volume.
+   */
+  @Override
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  /**
+   * Checks the index boundary of a block address.
+   *
+   * @param logicalBlockAddress the index of the first block of data to be read
+   * or written
+   * @param transferLengthInBlocks the total number of consecutive blocks about
+   * to be read or written
+   * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2
+   * indicates that LBA + transfer size is out of bounds.
+   */
+  @Override
+  public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) 
{
+    long sizeInBlocks = volumeSize / blockSize;
+    int res = 0;
+    if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) {
+      res = 1;
+    }
+
+    if (transferLengthInBlocks < 0 ||
+        logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) {
+      if (res == 0) {
+        res = 2;
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Number of blocks that make up this volume.
+   *
+   * @return long - count of blocks.
+   */
+  @Override
+  public long getSizeInBlocks() {
+    return volumeSize / blockSize;
+  }
+
+  /**
+   * Reads the number of bytes that can be read into the bytes buffer from the
+   * location indicated.
+   *
+   * @param bytes the array into which the data will be copied will be filled
+   * with data from storage
+   * @param storageIndex the position of the first byte to be copied
+   * @throws IOException
+   */
+  @Override
+  public void read(byte[] bytes, long storageIndex) throws IOException {
+    int startingIdxInBlock = (int) storageIndex % blockSize;
+    int idxInBytes = 0;
+    if (this.traceEnabled) {
+      TRACER.info("Task=ReadStart,length={},location={}",
+          bytes.length, storageIndex);
+    }
+    while (idxInBytes < bytes.length - 1) {
+      long blockId = (storageIndex + idxInBytes) / blockSize;
+      byte[] dataBytes;
+
+      try {
+        LogicalBlock block = this.cache.get(blockId);
+        dataBytes = block.getData().array();
+
+        if (this.traceEnabled) {
+          TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}",
+              blockId,
+              dataBytes.length,
+              dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null);
+        }
+      } catch (IOException e) {
+        // For an non-existing block cache.get will return a block with zero
+        // bytes filled. So any error here is a real error.
+        LOGGER.error("getting errors when reading data:" + e);
+        throw e;
+      }
+
+      int length = blockSize - startingIdxInBlock;
+      if (length > bytes.length - idxInBytes) {
+        length = bytes.length - idxInBytes;
+      }
+      if (dataBytes.length >= length) {
+        System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes,
+            length);
+      }
+      startingIdxInBlock = 0;
+      idxInBytes += length;
+    }
+    if (this.traceEnabled) {
+      TRACER.info("Task=ReadEnd,length={},location={},SHA={}",
+          bytes.length, storageIndex, DigestUtils.sha256Hex(bytes));
+    }
+  }
+
+  @Override
+  public void write(byte[] bytes, long storageIndex) throws IOException {
+    int startingIdxInBlock = (int) storageIndex % blockSize;
+    int idxInBytes = 0;
+    if (this.traceEnabled) {
+      TRACER.info("Task=WriteStart,length={},location={},SHA={}",
+          bytes.length, storageIndex,
+          bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null);
+    }
+
+    ByteBuffer dataByte = ByteBuffer.allocate(blockSize);
+    while (idxInBytes < bytes.length - 1) {
+      long blockId = (storageIndex + idxInBytes) / blockSize;
+      int length = blockSize - startingIdxInBlock;
+      if (length > bytes.length - idxInBytes) {
+        length = bytes.length - idxInBytes;
+      }
+      System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock,
+          length);
+      this.cache.put(blockId, dataByte.array());
+
+      if (this.traceEnabled) {
+        TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}",
+            blockId, dataByte.array().length,
+            dataByte.array().length > 0 ?
+                DigestUtils.sha256Hex(dataByte.array()) : null);
+      }
+      dataByte.clear();
+      startingIdxInBlock = 0;
+      idxInBytes += length;
+    }
+
+    if (this.traceEnabled) {
+      TRACER.info("Task=WriteEnd,length={},location={} ",
+          bytes.length, storageIndex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      cache.close();
+    } catch (IllegalStateException ise) {
+      LOGGER.error("Can not close the storage {}", ise);
+      throw ise;
+    }
+  }
+
+  /**
+   * Builder class for CBlocklocalCache.
+   */
+  public static class Builder {
+    private String userName;
+    private String volumeName;
+    private long volumeSize;
+    private int blockSize;
+    private List<Pipeline> containerList;
+    private Configuration conf;
+    private XceiverClientManager clientManager;
+    private ContainerCacheFlusher flusher;
+    private CBlockTargetMetrics metrics;
+
+    /**
+     * Constructs a builder.
+     */
+    Builder() {
+
+    }
+
+    public Builder setFlusher(ContainerCacheFlusher cacheFlusher) {
+      this.flusher = cacheFlusher;
+      return this;
+    }
+
+    /**
+     * set config.
+     *
+     * @param config - config
+     * @return Builder
+     */
+    public Builder setConf(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * set user name.
+     *
+     * @param cblockUserName - user name
+     * @return Builder
+     */
+    public Builder setUserName(String cblockUserName) {
+      this.userName = cblockUserName;
+      return this;
+    }
+
+    /**
+     * set volume name.
+     *
+     * @param cblockVolumeName -- volume name
+     * @return Builder
+     */
+    public Builder setVolumeName(String cblockVolumeName) {
+      this.volumeName = cblockVolumeName;
+      return this;
+    }
+
+    /**
+     * set volume size.
+     *
+     * @param cblockVolumeSize -- set volume size.
+     * @return Builder
+     */
+    public Builder setVolumeSize(long cblockVolumeSize) {
+      this.volumeSize = cblockVolumeSize;
+      return this;
+    }
+
+    /**
+     * set block size.
+     *
+     * @param cblockBlockSize -- block size
+     * @return Builder
+     */
+    public Builder setBlockSize(int cblockBlockSize) {
+      this.blockSize = cblockBlockSize;
+      return this;
+    }
+
+    /**
+     * Set contianer list.
+     *
+     * @param cblockContainerList - set the pipeline list
+     * @return Builder
+     */
+    public Builder setContainerList(List<Pipeline> cblockContainerList) {
+      this.containerList = cblockContainerList;
+      return this;
+    }
+
+    /**
+     * Set client manager.
+     *
+     * @param xceiverClientManager -- sets the client manager.
+     * @return Builder
+     */
+    public Builder setClientManager(XceiverClientManager xceiverClientManager) 
{
+      this.clientManager = xceiverClientManager;
+      return this;
+    }
+
+    /**
+     * Set Cblock Target Metrics.
+     *
+     * @param targetMetrics -- sets the cblock target metrics
+     * @return Builder
+     */
+    public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+      this.metrics = targetMetrics;
+      return this;
+    }
+
+    /**
+     * Builds the CBlockStorageImpl.
+     *
+     * @return builds the CBlock Scsi Target.
+     */
+    public CBlockIStorageImpl build() throws IOException {
+      if (StringUtils.isBlank(userName)) {
+        throw new IllegalArgumentException("User name cannot be null or empty" 
+
+            ".");
+      }
+      if (StringUtils.isBlank(volumeName)) {
+        throw new IllegalArgumentException("Volume name cannot be null or " +
+            "empty");
+      }
+
+      if (volumeSize < 1) {
+        throw new IllegalArgumentException("Volume size cannot be negative or" 
+
+            " zero.");
+      }
+
+      if (blockSize < 1) {
+        throw new IllegalArgumentException("Block size cannot be negative or " 
+
+            "zero.");
+      }
+
+      if (containerList == null || containerList.size() == 0) {
+        throw new IllegalArgumentException("Container list cannot be null or " 
+
+            "empty");
+      }
+      if (clientManager == null) {
+        throw new IllegalArgumentException("Client manager cannot be null");
+      }
+      if (conf == null) {
+        throw new IllegalArgumentException("Configuration cannot be null");
+      }
+
+      if (flusher == null) {
+        throw new IllegalArgumentException("Flusher Cannot be null.");
+      }
+      CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, 
this.userName,
+          this.volumeName, this.volumeSize, this.blockSize, this.containerList,
+          this.flusher);
+      impl.initCache(this.clientManager, this.metrics);
+      return impl;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
new file mode 100644
index 0000000..6367c61
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+import org.apache.hadoop.cblock.proto.MountVolumeResponse;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This class is the handler of CBlockManager used by target server
+ * to communicate with CBlockManager.
+ *
+ * More specifically, this class will expose local methods to target
+ * server, and make RPC calls to CBlockManager accordingly
+ */
+public class CBlockManagerHandler {
+
+  private final CBlockClientProtocolClientSideTranslatorPB handler;
+
+  public CBlockManagerHandler(
+      CBlockClientProtocolClientSideTranslatorPB handler) {
+    this.handler = handler;
+  }
+
+  public MountVolumeResponse mountVolume(
+      String userName, String volumeName) throws IOException {
+    return handler.mountVolume(userName, volumeName);
+  }
+
+  public List<VolumeInfo> listVolumes() throws IOException {
+    return handler.listVolumes();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to