http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
new file mode 100644
index 0000000..e7df0cf
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * This class is for maintaining  the various Cblock Target statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ *
+ * This class maintains stats like cache hit and miss ratio
+ * as well as the latency time of read and write ops.
+ */
+public class CBlockTargetMetrics {
+  // IOPS based Metrics
+  @Metric private MutableCounterLong numReadOps;
+  @Metric private MutableCounterLong numWriteOps;
+  @Metric private MutableCounterLong numReadCacheHits;
+  @Metric private MutableCounterLong numReadCacheMiss;
+  @Metric private MutableCounterLong numDirectBlockWrites;
+
+  // Cblock internal Metrics
+  @Metric private MutableCounterLong numDirtyLogBlockRead;
+  @Metric private MutableCounterLong numBytesDirtyLogRead;
+  @Metric private MutableCounterLong numBytesDirtyLogWritten;
+  @Metric private MutableCounterLong numBlockBufferFlushCompleted;
+  @Metric private MutableCounterLong numBlockBufferFlushTriggered;
+  @Metric private MutableCounterLong numBlockBufferUpdates;
+  @Metric private MutableCounterLong numRetryLogBlockRead;
+  @Metric private MutableCounterLong numBytesRetryLogRead;
+
+  // Failure Metrics
+  @Metric private MutableCounterLong numReadLostBlocks;
+  @Metric private MutableCounterLong numFailedReadBlocks;
+  @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
+  @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
+  @Metric private MutableCounterLong numFailedDirectBlockWrites;
+  @Metric private MutableCounterLong numIllegalDirtyLogFiles;
+  @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
+  @Metric private MutableCounterLong numFailedBlockBufferFlushes;
+  @Metric private MutableCounterLong numInterruptedBufferWaits;
+  @Metric private MutableCounterLong numFailedRetryLogFileWrites;
+  @Metric private MutableCounterLong numWriteMaxRetryBlocks;
+  @Metric private MutableCounterLong numFailedReleaseLevelDB;
+
+  // Latency based Metrics
+  @Metric private MutableRate dbReadLatency;
+  @Metric private MutableRate containerReadLatency;
+  @Metric private MutableRate dbWriteLatency;
+  @Metric private MutableRate containerWriteLatency;
+  @Metric private MutableRate blockBufferFlushLatency;
+  @Metric private MutableRate directBlockWriteLatency;
+
+  public CBlockTargetMetrics() {
+  }
+
+  public static CBlockTargetMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register("CBlockTargetMetrics",
+        "CBlock Target Metrics",
+        new CBlockTargetMetrics());
+  }
+
+  public void incNumReadOps() {
+    numReadOps.incr();
+  }
+
+  public void incNumWriteOps() {
+    numWriteOps.incr();
+  }
+
+  public void incNumReadCacheHits() {
+    numReadCacheHits.incr();
+  }
+
+  public void incNumReadCacheMiss() {
+    numReadCacheMiss.incr();
+  }
+
+  public void incNumReadLostBlocks() {
+    numReadLostBlocks.incr();
+  }
+
+  public void incNumDirectBlockWrites() {
+    numDirectBlockWrites.incr();
+  }
+
+  public void incNumWriteIOExceptionRetryBlocks() {
+    numWriteIOExceptionRetryBlocks.incr();
+  }
+
+  public void incNumWriteGenericExceptionRetryBlocks() {
+    numWriteGenericExceptionRetryBlocks.incr();
+  }
+
+  public void incNumFailedDirectBlockWrites() {
+    numFailedDirectBlockWrites.incr();
+  }
+
+  public void incNumFailedReadBlocks() {
+    numFailedReadBlocks.incr();
+  }
+
+  public void incNumBlockBufferFlushCompleted() {
+    numBlockBufferFlushCompleted.incr();
+  }
+
+  public void incNumBlockBufferFlushTriggered() {
+    numBlockBufferFlushTriggered.incr();
+  }
+
+  public void incNumDirtyLogBlockRead() {
+    numDirtyLogBlockRead.incr();
+  }
+
+  public void incNumBytesDirtyLogRead(int bytes) {
+    numBytesDirtyLogRead.incr(bytes);
+  }
+
+  public void incNumBlockBufferUpdates() {
+    numBlockBufferUpdates.incr();
+  }
+
+  public void incNumRetryLogBlockRead() {
+    numRetryLogBlockRead.incr();
+  }
+
+  public void incNumBytesRetryLogRead(int bytes) {
+    numBytesRetryLogRead.incr(bytes);
+  }
+
+  public void incNumBytesDirtyLogWritten(int bytes) {
+    numBytesDirtyLogWritten.incr(bytes);
+  }
+
+  public void incNumFailedBlockBufferFlushes() {
+    numFailedBlockBufferFlushes.incr();
+  }
+
+  public void incNumInterruptedBufferWaits() {
+    numInterruptedBufferWaits.incr();
+  }
+
+  public void incNumIllegalDirtyLogFiles() {
+    numIllegalDirtyLogFiles.incr();
+  }
+
+  public void incNumFailedDirtyLogFileDeletes() {
+    numFailedDirtyLogFileDeletes.incr();
+  }
+
+  public void incNumFailedRetryLogFileWrites() {
+    numFailedRetryLogFileWrites.incr();
+  }
+
+  public void incNumWriteMaxRetryBlocks() {
+    numWriteMaxRetryBlocks.incr();
+  }
+
+  public void incNumFailedReleaseLevelDB() {
+    numFailedReleaseLevelDB.incr();
+  }
+
+  public void updateDBReadLatency(long latency) {
+    dbReadLatency.add(latency);
+  }
+
+  public void updateContainerReadLatency(long latency) {
+    containerReadLatency.add(latency);
+  }
+
+  public void updateDBWriteLatency(long latency) {
+    dbWriteLatency.add(latency);
+  }
+
+  public void updateContainerWriteLatency(long latency) {
+    containerWriteLatency.add(latency);
+  }
+
+  public void updateDirectBlockWriteLatency(long latency) {
+    directBlockWriteLatency.add(latency);
+  }
+
+  public void updateBlockBufferFlushLatency(long latency) {
+    blockBufferFlushLatency.add(latency);
+  }
+
+  @VisibleForTesting
+  public long getNumReadOps() {
+    return numReadOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteOps() {
+    return numWriteOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadCacheHits() {
+    return numReadCacheHits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadCacheMiss() {
+    return numReadCacheMiss.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadLostBlocks() {
+    return numReadLostBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumDirectBlockWrites() {
+    return numDirectBlockWrites.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedDirectBlockWrites() {
+    return numFailedDirectBlockWrites.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedReadBlocks() {
+    return numFailedReadBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteIOExceptionRetryBlocks() {
+    return numWriteIOExceptionRetryBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteGenericExceptionRetryBlocks() {
+    return numWriteGenericExceptionRetryBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockBufferFlushCompleted() {
+    return numBlockBufferFlushCompleted.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockBufferFlushTriggered() {
+    return numBlockBufferFlushTriggered.value();
+  }
+
+  @VisibleForTesting
+  public long getNumDirtyLogBlockRead() {
+    return numDirtyLogBlockRead.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBytesDirtyLogReads() {
+    return numBytesDirtyLogRead.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockBufferUpdates() {
+    return numBlockBufferUpdates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumRetryLogBlockRead() {
+    return numRetryLogBlockRead.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBytesRetryLogReads() {
+    return numBytesRetryLogRead.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBytesDirtyLogWritten() {
+    return numBytesDirtyLogWritten.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedBlockBufferFlushes() {
+    return numFailedBlockBufferFlushes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumInterruptedBufferWaits() {
+    return numInterruptedBufferWaits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumIllegalDirtyLogFiles() {
+    return numIllegalDirtyLogFiles.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedDirtyLogFileDeletes() {
+    return numFailedDirtyLogFileDeletes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedRetryLogFileWrites() {
+    return numFailedRetryLogFileWrites.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteMaxRetryBlocks() {
+    return numWriteMaxRetryBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedReleaseLevelDB() {
+    return numFailedReleaseLevelDB.value();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
new file mode 100644
index 0000000..75e013e
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.cblock.proto.MountVolumeResponse;
+import org.apache.hadoop.cblock.util.KeyUtil;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.jscsi.target.Configuration;
+import org.jscsi.target.Target;
+import org.jscsi.target.TargetServer;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * This class extends JSCSI target server, which is a ISCSI target that can be
+ * recognized by a remote machine with ISCSI installed.
+ */
+public final class CBlockTargetServer extends TargetServer {
+  private final OzoneConfiguration conf;
+  private final CBlockManagerHandler cBlockManagerHandler;
+  private final XceiverClientManager xceiverClientManager;
+  private final ContainerCacheFlusher containerCacheFlusher;
+  private final CBlockTargetMetrics metrics;
+
+  public CBlockTargetServer(OzoneConfiguration ozoneConfig,
+      Configuration jscsiConf,
+      CBlockManagerHandler cBlockManagerHandler,
+      CBlockTargetMetrics metrics)
+      throws IOException {
+    super(jscsiConf);
+    this.cBlockManagerHandler = cBlockManagerHandler;
+    this.xceiverClientManager = new XceiverClientManager(ozoneConfig);
+    this.conf = ozoneConfig;
+    this.containerCacheFlusher = new ContainerCacheFlusher(this.conf,
+        xceiverClientManager, metrics);
+    this.metrics = metrics;
+    LOGGER.info("Starting flusher thread.");
+    Thread flushListenerThread = new Thread(containerCacheFlusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+  }
+
+  public static void main(String[] args) throws Exception {
+  }
+
+  @Override
+  public boolean isValidTargetName(String checkTargetName) {
+    if (!KeyUtil.isValidVolumeKey(checkTargetName)) {
+      return false;
+    }
+    String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName);
+    String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName);
+    if (userName == null || volumeName == null) {
+      return false;
+    }
+    try {
+      MountVolumeResponse result =
+          cBlockManagerHandler.mountVolume(userName, volumeName);
+      if (!result.getIsValid()) {
+        LOGGER.error("Not a valid volume:" + checkTargetName);
+        return false;
+      }
+      String volumeKey = KeyUtil.getVolumeKey(result.getUserName(),
+          result.getVolumeName());
+      if (!targets.containsKey(volumeKey)) {
+        LOGGER.info("Mounting Volume. username: {} volume:{}",
+            userName, volumeName);
+        CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+            .setUserName(userName)
+            .setVolumeName(volumeName)
+            .setVolumeSize(result.getVolumeSize())
+            .setBlockSize(result.getBlockSize())
+            .setContainerList(result.getContainerList())
+            .setClientManager(xceiverClientManager)
+            .setConf(this.conf)
+            .setFlusher(containerCacheFlusher)
+            .setCBlockTargetMetrics(metrics)
+            .build();
+        Target target = new Target(volumeKey, volumeKey, ozoneStore);
+        targets.put(volumeKey, target);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Can not connect to server when validating target!"
+          + e.getMessage());
+    }
+    return targets.containsKey(checkTargetName);
+  }
+
+  @Override
+  public String[] getTargetNames() {
+    try {
+      if (cBlockManagerHandler != null) {
+        return cBlockManagerHandler.listVolumes().
+            stream().map(
+              volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo
+                .getVolumeName()).toArray(String[]::new);
+      } else {
+        return new String[0];
+      }
+    } catch (IOException e) {
+      LOGGER.error("Can't list existing volumes", e);
+      return new String[0];
+    }
+  }
+
+  @VisibleForTesting
+  public HashMap<String, Target> getTargets() {
+    return targets;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
new file mode 100644
index 0000000..292662e
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_KEEP_ALIVE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_THREAD_PRIORITY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT;
+
+/**
+ * Class that writes to remote containers.
+ */
+public class ContainerCacheFlusher implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCacheFlusher.class);
+  private final LinkedBlockingQueue<Message> messageQueue;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  private final ArrayBlockingQueue<Runnable> workQueue;
+  private final ConcurrentMap<String, RefCountedDB> dbMap;
+  private final ByteBuffer blockIDBuffer;
+  private final ConcurrentMap<String, Pipeline[]> pipelineMap;
+  private final AtomicLong remoteIO;
+  private final XceiverClientManager xceiverClientManager;
+  private final CBlockTargetMetrics metrics;
+  private AtomicBoolean shutdown;
+  private final long levelDBCacheSize;
+  private final int maxRetryCount;
+  private final String tracePrefix;
+
+  private final ConcurrentMap<String, FinishCounter> finishCountMap;
+
+  /**
+   * Constructs the writers to remote queue.
+   */
+  public ContainerCacheFlusher(Configuration config,
+      XceiverClientManager xceiverClientManager,
+      CBlockTargetMetrics metrics) {
+    int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB,
+        DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024;
+    int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE,
+        DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT);
+    int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE,
+        DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT);
+    long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE,
+        DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, TimeUnit.SECONDS);
+    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
+        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
+    int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
+    levelDBCacheSize = 
config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY,
+        DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB;
+
+    LOG.info("Cache: Core Pool Size: {}", corePoolSize);
+    LOG.info("Cache: Keep Alive: {}", keepAlive);
+    LOG.info("Cache: Max Pool Size: {}", maxPoolSize);
+    LOG.info("Cache: Thread Pri: {}", threadPri);
+    LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize);
+
+    shutdown = new AtomicBoolean(false);
+    messageQueue = new LinkedBlockingQueue<>();
+    workQueue = new ArrayBlockingQueue<>(queueSize, true);
+
+    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Cache Block Writer Thread #%d")
+        .setDaemon(true)
+        .setPriority(threadPri)
+        .build();
+    threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
+        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
+        new ThreadPoolExecutor.AbortPolicy());
+    threadPoolExecutor.prestartAllCoreThreads();
+
+    dbMap = new ConcurrentHashMap<>();
+    pipelineMap = new ConcurrentHashMap<>();
+    blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+    this.xceiverClientManager = xceiverClientManager;
+    this.metrics = metrics;
+    this.remoteIO = new AtomicLong();
+
+    this.finishCountMap = new ConcurrentHashMap<>();
+    this.maxRetryCount =
+        config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY,
+            CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT);
+    this.tracePrefix = getTracePrefix();
+  }
+
+  private void checkExistingLog(String prefixFileName, File dbPath) {
+    if (!dbPath.exists()) {
+      LOG.debug("No existing dirty log found at {}", dbPath);
+      return;
+    }
+    LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
+    HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
+    traverse(prefixFileName, dbPath, allFiles);
+    for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
+      String parentPath = entry.getKey();
+      for (String fileName : entry.getValue()) {
+        LOG.info("found {} {} with prefix {}",
+            parentPath, fileName, prefixFileName);
+        processDirtyBlocks(parentPath, fileName);
+      }
+    }
+  }
+
+  private void traverse(String prefixFileName, File path,
+                        HashMap<String, ArrayList<String>> files) {
+    if (path.isFile()) {
+      if (path.getName().startsWith(prefixFileName)) {
+        LOG.debug("found this {} with {}", path.getParent(), path.getName());
+        if (!files.containsKey(path.getParent())) {
+          files.put(path.getParent(), new ArrayList<>());
+        }
+        files.get(path.getParent()).add(path.getName());
+      }
+    } else {
+      File[] listFiles = path.listFiles();
+      if (listFiles != null) {
+        for (File subPath : listFiles) {
+          traverse(prefixFileName, subPath, files);
+        }
+      }
+    }
+  }
+
+  /**
+   * Gets the CBlockTargetMetrics.
+   *
+   * @return CBlockTargetMetrics
+   */
+  public CBlockTargetMetrics getTargetMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Gets the  getXceiverClientManager.
+   *
+   * @return XceiverClientManager
+   */
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  /**
+   * Shutdown this instance.
+   */
+  public void shutdown() {
+    this.shutdown.set(true);
+    threadPoolExecutor.shutdown();
+  }
+
+  public long incrementRemoteIO() {
+    return remoteIO.incrementAndGet();
+  }
+
+  /**
+   * Processes a block cache file and queues those blocks for the remote I/O.
+   *
+   * @param dbPath - Location where the DB can be found.
+   * @param fileName - Block Cache File Name
+   */
+  public void processDirtyBlocks(String dbPath, String fileName) {
+    LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName,
+        messageQueue.size());
+    this.messageQueue.add(new Message(dbPath, fileName));
+  }
+
+  public Logger getLOG() {
+    return LOG;
+  }
+
+  /**
+   * Opens a DB if needed or returns a handle to an already open DB.
+   *
+   * @param dbPath -- dbPath
+   * @return the levelDB on the given path.
+   * @throws IOException
+   */
+  public synchronized LevelDBStore openDB(String dbPath)
+      throws IOException {
+    if (dbMap.containsKey(dbPath)) {
+      RefCountedDB refDB = dbMap.get(dbPath);
+      refDB.open();
+      return refDB.db;
+    } else {
+      Options options = new Options();
+      options.cacheSize(levelDBCacheSize);
+      options.createIfMissing(true);
+      LevelDBStore cacheDB = new LevelDBStore(
+          new File(getDBFileName(dbPath)), options);
+      RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB);
+      dbMap.put(dbPath, refDB);
+      return cacheDB;
+    }
+  }
+
+  /**
+   * Updates the container map. This data never changes so we will update this
+   * during restarts and it should not hurt us.
+   *
+   * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files
+   * for the volume
+   *
+   * @param dbPath - DbPath
+   * @param containerList - Container List.
+   */
+  public void register(String dbPath, Pipeline[] containerList) {
+    File dbFile = Paths.get(dbPath).toFile();
+    pipelineMap.put(dbPath, containerList);
+    checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile);
+    checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile);
+  }
+
+  private String getDBFileName(String dbPath) {
+    return dbPath + ".db";
+  }
+
+  public LevelDBStore getCacheDB(String dbPath) throws IOException {
+    return openDB(dbPath);
+  }
+
+  public void releaseCacheDB(String dbPath) {
+    try {
+      closeDB(dbPath);
+    } catch (Exception e) {
+      metrics.incNumFailedReleaseLevelDB();
+      LOG.error("LevelDB close failed, dbPath:" + dbPath, e);
+    }
+  }
+  /**
+   * Close the DB if we don't have any outstanding references.
+   *
+   * @param dbPath - dbPath
+   * @throws IOException
+   */
+  public synchronized void closeDB(String dbPath) throws IOException {
+    if (dbMap.containsKey(dbPath)) {
+      RefCountedDB refDB = dbMap.get(dbPath);
+      int count = refDB.close();
+      if (count == 0) {
+        dbMap.remove(dbPath);
+      }
+    }
+  }
+
+  Pipeline getPipeline(String dbPath, long blockId) {
+    Pipeline[] containerList = pipelineMap.get(dbPath);
+    Preconditions.checkNotNull(containerList);
+    int containerIdx = (int) blockId % containerList.length;
+    long cBlockIndex =
+        Longs.fromByteArray(containerList[containerIdx].getData());
+    if (cBlockIndex > 0) {
+      // This catches the case when we get a wrong container in the ordering
+      // of the containers.
+      Preconditions.checkState(containerIdx % cBlockIndex == 0,
+          "The container ID computed should match with the container index " +
+              "returned from cBlock Server.");
+    }
+    return containerList[containerIdx];
+  }
+
+  public void incFinishCount(String fileName) {
+    if (!finishCountMap.containsKey(fileName)) {
+      LOG.error("No record for such file:" + fileName);
+      return;
+    }
+    finishCountMap.get(fileName).incCount();
+    if (finishCountMap.get(fileName).isFileDeleted()) {
+      finishCountMap.remove(fileName);
+    }
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    while (!this.shutdown.get()) {
+      try {
+        Message message = messageQueue.take();
+        LOG.debug("Got message to process -- DB Path : {} , FileName; {}",
+            message.getDbPath(), message.getFileName());
+        String fullPath = Paths.get(message.getDbPath(),
+            message.getFileName()).toString();
+        String[] fileNameParts = message.getFileName().split("\\.");
+        Preconditions.checkState(fileNameParts.length > 1);
+        String fileType = fileNameParts[0];
+        boolean isDirtyLogFile =
+            fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX);
+        ReadableByteChannel fileChannel = new FileInputStream(fullPath)
+            .getChannel();
+        // TODO: We can batch and unique the IOs here. First getting the code
+        // to work, we will add those later.
+        int bytesRead = fileChannel.read(blockIDBuffer);
+        fileChannel.close();
+        LOG.debug("Read blockID log of size: {} position {} remaining {}",
+            bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining());
+        // current position of in the buffer in bytes, divided by number of
+        // bytes per long (which is calculated by number of bits per long
+        // divided by number of bits per byte) gives the number of blocks
+        int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
+        if (isDirtyLogFile) {
+          getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
+        } else {
+          getTargetMetrics().incNumBytesRetryLogRead(bytesRead);
+        }
+        if (finishCountMap.containsKey(message.getFileName())) {
+          // In theory this should never happen. But if it happened,
+          // we need to know it...
+          getTargetMetrics().incNumIllegalDirtyLogFiles();
+          LOG.error("Adding DirtyLog file again {} current count {} new {}",
+              message.getFileName(),
+              finishCountMap.get(message.getFileName()).expectedCount,
+              blockCount);
+        }
+        finishCountMap.put(message.getFileName(),
+            new FinishCounter(blockCount, message.getDbPath(),
+                message.getFileName(), this));
+        // should be flip instead of rewind, because we also need to make sure
+        // the end position is correct.
+        blockIDBuffer.flip();
+        LOG.debug("Remaining blocks count {} and {}", 
blockIDBuffer.remaining(),
+            blockCount);
+        while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
+          long blockID = blockIDBuffer.getLong();
+          int retryCount = 0;
+          if (isDirtyLogFile) {
+            getTargetMetrics().incNumDirtyLogBlockRead();
+          } else {
+            getTargetMetrics().incNumRetryLogBlockRead();
+            Preconditions.checkState(fileNameParts.length == 4);
+            retryCount = Integer.parseInt(fileNameParts[3]);
+          }
+          LogicalBlock block = new DiskBlock(blockID, null, false);
+          BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
+              message.getDbPath(), retryCount, message.getFileName(),
+              maxRetryCount);
+          threadPoolExecutor.submit(blockWriterTask);
+        }
+        blockIDBuffer.clear();
+      } catch (InterruptedException e) {
+        LOG.info("ContainerCacheFlusher is interrupted.", e);
+      } catch (FileNotFoundException e) {
+        LOG.error("Unable to find the dirty blocks file. This will cause " +
+            "data errors. Please stop using this volume.", e);
+      } catch (IOException e) {
+        LOG.error("Unable to read the dirty blocks file. This will cause " +
+            "data errors. Please stop using this volume.", e);
+      } catch (Exception e) {
+        LOG.error("Generic exception.", e);
+      }
+    }
+    LOG.info("Exiting flusher");
+  }
+
+  /**
+   * Tries to get the local host IP Address as trace prefix
+   * for creating trace IDs, otherwise uses a random UUID for it.
+   */
+  private static String getTracePrefix() {
+    String tmp;
+    try {
+      tmp = InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException ex) {
+      tmp = UUID.randomUUID().toString();
+      LOG.error("Unable to read the host address. Using a GUID for " +
+          "hostname:{} ", tmp, ex);
+    }
+    return tmp;
+  }
+
+  /**
+   * We create a trace ID to make it easy to debug issues.
+   * A trace ID is in IPAddress:UserName:VolumeName:blockID:second format.
+   *
+   * This will get written down on the data node if we get any failures, so
+   * with this trace ID we can correlate cBlock failures across machines.
+   *
+   * @param blockID - Block ID
+   * @return trace ID
+   */
+  public String getTraceID(File dbPath, long blockID) {
+    String volumeName = dbPath.getName();
+    String userName = dbPath.getParentFile().getName();
+    // mapping to seconds to make the string smaller.
+    return tracePrefix + ":" + userName + ":" + volumeName
+        + ":" + blockID + ":" + Time.monotonicNow() / 1000;
+  }
+
+  /**
+   * Keeps a Reference counted DB that we close only when the total Reference
+   * has gone to zero.
+   */
+  private static class RefCountedDB {
+    private LevelDBStore db;
+    private AtomicInteger refcount;
+    private String dbPath;
+
+    /**
+     * RefCountedDB DB ctor.
+     *
+     * @param dbPath - DB path.
+     * @param db - LevelDBStore db
+     */
+    RefCountedDB(String dbPath, LevelDBStore db) {
+      this.db = db;
+      this.refcount = new AtomicInteger(1);
+      this.dbPath = dbPath;
+    }
+
+    /**
+     * close the DB if possible.
+     */
+    public int close() throws IOException {
+      int count = this.refcount.decrementAndGet();
+      if (count == 0) {
+        LOG.info("Closing the LevelDB. {} ", this.dbPath);
+        db.close();
+      }
+      return count;
+    }
+
+    public void open() {
+      this.refcount.incrementAndGet();
+    }
+  }
+
+  /**
+   * The message held in processing queue.
+   */
+  private static class Message {
+    private String dbPath;
+    private String fileName;
+
+    /**
+     * A message that holds the info about which path dirty blocks log and
+     * which path contains db.
+     *
+     * @param dbPath
+     * @param fileName
+     */
+    Message(String dbPath, String fileName) {
+      this.dbPath = dbPath;
+      this.fileName = fileName;
+    }
+
+    public String getDbPath() {
+      return dbPath;
+    }
+
+    public void setDbPath(String dbPath) {
+      this.dbPath = dbPath;
+    }
+
+    public String getFileName() {
+      return fileName;
+    }
+
+    public void setFileName(String fileName) {
+      this.fileName = fileName;
+    }
+  }
+
+  private static class FinishCounter {
+    private final long expectedCount;
+    private final String dbPath;
+    private final String dirtyLogPath;
+    private final AtomicLong currentCount;
+    private AtomicBoolean fileDeleted;
+    private final ContainerCacheFlusher flusher;
+
+    FinishCounter(long expectedCount, String dbPath,
+        String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException 
{
+      this.expectedCount = expectedCount;
+      this.dbPath = dbPath;
+      this.dirtyLogPath = dirtyLogPath;
+      this.currentCount = new AtomicLong(0);
+      this.fileDeleted = new AtomicBoolean(false);
+      this.flusher = flusher;
+    }
+
+    public boolean isFileDeleted() {
+      return fileDeleted.get();
+    }
+
+    public void incCount() {
+      long count = this.currentCount.incrementAndGet();
+      if (count >= expectedCount) {
+        String filePath = String.format("%s/%s", dbPath, dirtyLogPath);
+        LOG.debug(
+            "Deleting {} with count {} {}", filePath, count, expectedCount);
+        try {
+          Path path = Paths.get(filePath);
+          Files.delete(path);
+          // the following part tries to remove the directory if it is empty
+          // but not sufficient, because the .db directory still exists....
+          // TODO how to handle the .db directory?
+          /*Path parent = path.getParent();
+          if (parent.toFile().listFiles().length == 0) {
+            Files.delete(parent);
+          }*/
+          fileDeleted.set(true);
+        } catch (Exception e) {
+          flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
+          LOG.error("Error deleting dirty log file:" + filePath, e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
new file mode 100644
index 0000000..f164f38
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ISCSI_ADVERTISED_IP;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ISCSI_ADVERTISED_PORT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT;
+
+import org.apache.hadoop.cblock.CblockUtils;
+import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
+import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.scm.client.ContainerOperationClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.jscsi.target.Configuration;
+
+import java.net.InetSocketAddress;
+
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_KEY;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT;
+import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static 
org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT;
+import static 
org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY;
+import static 
org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
+import static 
org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
+import static 
org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+
+/**
+ * This class runs the target server process.
+ */
+public final class SCSITargetDaemon {
+  public static void main(String[] args) throws Exception {
+    CblockUtils.activateConfigs();
+    OzoneConfiguration ozoneConf = new OzoneConfiguration();
+
+    RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY,
+        DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT);
+    ContainerOperationClient.setContainerSizeB(
+        containerSizeGB * OzoneConsts.GB);
+    String jscsiServerAddress = ozoneConf.get(
+        DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY,
+        DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT);
+    String cbmIPAddress = ozoneConf.get(
+        DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY,
+        DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT
+    );
+    int cbmPort = ozoneConf.getInt(
+        DFS_CBLOCK_JSCSI_PORT_KEY,
+        DFS_CBLOCK_JSCSI_PORT_DEFAULT
+    );
+
+    String scmAddress = ozoneConf.get(OZONE_SCM_CLIENT_BIND_HOST_KEY,
+        OZONE_SCM_CLIENT_BIND_HOST_DEFAULT);
+    int scmClientPort = ozoneConf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
+        OZONE_SCM_CLIENT_PORT_DEFAULT);
+    int scmDatanodePort = ozoneConf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
+        OZONE_SCM_DATANODE_PORT_DEFAULT);
+
+    String scmClientAddress = scmAddress + ":" + scmClientPort;
+    String scmDataodeAddress = scmAddress + ":" + scmDatanodePort;
+
+    ozoneConf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, scmClientAddress);
+    ozoneConf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, scmDataodeAddress);
+
+    InetSocketAddress cbmAddress = new InetSocketAddress(
+        cbmIPAddress, cbmPort);
+    long version = RPC.getProtocolVersion(
+        CBlockServiceProtocolPB.class);
+    CBlockClientProtocolClientSideTranslatorPB cbmClient =
+        new CBlockClientProtocolClientSideTranslatorPB(
+            RPC.getProxy(CBlockClientServerProtocolPB.class, version,
+                cbmAddress, UserGroupInformation.getCurrentUser(), ozoneConf,
+                NetUtils.getDefaultSocketFactory(ozoneConf), 5000)
+        );
+    CBlockManagerHandler cbmHandler = new CBlockManagerHandler(cbmClient);
+
+    String advertisedAddress = ozoneConf.
+        getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress);
+
+    int advertisedPort = ozoneConf.
+        getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT,
+            DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT);
+
+    Configuration jscsiConfig =
+        new Configuration(jscsiServerAddress,
+            advertisedAddress,
+            advertisedPort);
+    DefaultMetricsSystem.initialize("CBlockMetrics");
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    CBlockTargetServer targetServer = new CBlockTargetServer(
+        ozoneConf, jscsiConfig, cbmHandler, metrics);
+
+    targetServer.call();
+  }
+
+  private SCSITargetDaemon() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
new file mode 100644
index 0000000..300b2ae
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache;
+
+import java.io.IOException;
+
+/**
+ * Defines the interface for cache implementations. The cache will be called
+ * by cblock storage module when it performs IO operations.
+ */
+public interface CacheModule {
+  /**
+   * check if the key is cached, if yes, returned the cached object.
+   * otherwise, load from data source. Then put it into cache.
+   *
+   * @param blockID
+   * @return the target block.
+   */
+  LogicalBlock get(long blockID) throws IOException;
+
+  /**
+   * put the value of the key into cache.
+   * @param blockID
+   * @param value
+   */
+  void put(long blockID, byte[] value) throws IOException;
+
+  void flush() throws IOException;
+
+  void start() throws IOException;
+
+  void stop() throws IOException;
+
+  void close() throws IOException;
+
+  boolean isDirtyCache();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
new file mode 100644
index 0000000..470826f
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Logical Block is the data structure that we write to the cache,
+ * the key and data gets written to remote contianers. Rest is used for
+ * book keeping for the cache.
+ */
+public interface LogicalBlock {
+  /**
+   * Returns the data stream of this block.
+   * @return - ByteBuffer
+   */
+  ByteBuffer getData();
+
+  /**
+   * Frees the byte buffer since we don't need it any more.
+   */
+  void clearData();
+
+  /**
+   * Returns the Block ID for this Block.
+   * @return long - BlockID
+   */
+  long getBlockID();
+
+  /**
+   * Flag that tells us if this block has been persisted to container.
+   * @return whether this block is now persistent
+   */
+  boolean isPersisted();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
new file mode 100644
index 0000000..992578f
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A Queue that is used to write blocks asynchronously to the container.
+ */
+public class AsyncBlockWriter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AsyncBlockWriter.class);
+
+  /**
+   * XceiverClientManager is used to get client connections to a set of
+   * machines.
+   */
+  private final XceiverClientManager xceiverClientManager;
+
+  /**
+   * This lock is used as a signal to re-queuing thread. The requeue thread
+   * wakes up as soon as it is signaled some blocks are in the retry queue.
+   * We try really aggressively since this new block will automatically move
+   * to the end of the queue.
+   * <p>
+   * In the event a container is unavailable for a long time, we can either
+   * fail all writes or remap and let the writes succeed. The easier
+   * semantics is to fail the volume until the container is recovered by SCM.
+   */
+  private final Lock lock;
+  private final Condition notEmpty;
+  /**
+   * The cache this writer is operating against.
+   */
+  private final CBlockLocalCache parentCache;
+  private final BlockBufferManager blockBufferManager;
+  public final static String DIRTY_LOG_PREFIX = "DirtyLog";
+  public static final String RETRY_LOG_PREFIX = "RetryLog";
+  private AtomicLong localIoCount;
+
+  /**
+   * Constructs an Async Block Writer.
+   *
+   * @param config - Config
+   * @param cache - Parent Cache for this writer
+   */
+  public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
+
+    Preconditions.checkNotNull(cache, "Cache cannot be null.");
+    Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
+    localIoCount = new AtomicLong();
+    lock = new ReentrantLock();
+    notEmpty = lock.newCondition();
+    parentCache = cache;
+    xceiverClientManager = cache.getClientManager();
+    blockBufferManager = new BlockBufferManager(config, parentCache);
+  }
+
+  public void start() throws IOException {
+    File logDir = new File(parentCache.getDbPath().toString());
+    if (!logDir.exists() && !logDir.mkdirs()) {
+      LOG.error("Unable to create the log directory, Critical error cannot " +
+          "continue. Log Dir : {}", logDir);
+      throw new IllegalStateException("Cache Directory create failed, Cannot " 
+
+          "continue. Log Dir: {}" + logDir);
+    }
+    blockBufferManager.start();
+  }
+
+  /**
+   * Return the log to write to.
+   *
+   * @return Logger.
+   */
+  public static Logger getLOG() {
+    return LOG;
+  }
+
+  /**
+   * Get the CacheDB.
+   *
+   * @return LevelDB Handle
+   */
+  LevelDBStore getCacheDB() {
+    return parentCache.getCacheDB();
+  }
+
+  /**
+   * Returns the client manager.
+   *
+   * @return XceiverClientManager
+   */
+  XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  /**
+   * Incs the localIoPacket Count that has gone into this device.
+   */
+  public long incrementLocalIO() {
+    return localIoCount.incrementAndGet();
+  }
+
+  /**
+   * Return the local io counts to this device.
+   * @return the count of io
+   */
+  public long getLocalIOCount() {
+    return localIoCount.get();
+  }
+
+  /**
+   * Writes a block to LevelDB store and queues a work item for the system to
+   * sync the block to containers.
+   *
+   * @param block - Logical Block
+   */
+  public void writeBlock(LogicalBlock block) throws IOException {
+    byte[] keybuf = Longs.toByteArray(block.getBlockID());
+    String traceID = parentCache.getTraceID(block.getBlockID());
+    if (parentCache.isShortCircuitIOEnabled()) {
+      long startTime = Time.monotonicNow();
+      getCacheDB().put(keybuf, block.getData().array());
+      incrementLocalIO();
+      long endTime = Time.monotonicNow();
+      parentCache.getTargetMetrics().updateDBWriteLatency(
+                                                    endTime - startTime);
+      if (parentCache.isTraceEnabled()) {
+        String datahash = DigestUtils.sha256Hex(block.getData().array());
+        parentCache.getTracer().info(
+            "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}",
+            block.getBlockID(), endTime - startTime, datahash);
+      }
+      block.clearData();
+      blockBufferManager.addToBlockBuffer(block.getBlockID());
+    } else {
+      Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
+      String containerName = pipeline.getContainerName();
+      XceiverClientSpi client = null;
+      try {
+        long startTime = Time.monotonicNow();
+        client = parentCache.getClientManager()
+            .acquireClient(parentCache.getPipeline(block.getBlockID()));
+        ContainerProtocolCalls.writeSmallFile(client, containerName,
+            Long.toString(block.getBlockID()), block.getData().array(),
+            traceID);
+        long endTime = Time.monotonicNow();
+        if (parentCache.isTraceEnabled()) {
+          String datahash = DigestUtils.sha256Hex(block.getData().array());
+          parentCache.getTracer().info(
+              "Task=DirectWriterPut,BlockID={},Time={},SHA={}",
+              block.getBlockID(), endTime - startTime, datahash);
+        }
+        parentCache.getTargetMetrics().
+            updateDirectBlockWriteLatency(endTime - startTime);
+        parentCache.getTargetMetrics().incNumDirectBlockWrites();
+      } catch (Exception ex) {
+        parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
+        LOG.error("Direct I/O writing of block:{} traceID:{} to "
+            + "container {} failed", block.getBlockID(), traceID,
+            containerName, ex);
+        throw ex;
+      } finally {
+        if (client != null) {
+          parentCache.getClientManager().releaseClient(client);
+        }
+        block.clearData();
+      }
+    }
+  }
+
+  /**
+   * Shutdown by writing any pending I/O to dirtylog buffer.
+   */
+  public void shutdown() {
+    blockBufferManager.shutdown();
+  }
+  /**
+   * Returns tracer.
+   *
+   * @return Tracer
+   */
+  Logger getTracer() {
+    return parentCache.getTracer();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
new file mode 100644
index 0000000..c61a7a4
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+
+/**
+ * This task is responsible for flushing the BlockIDBuffer
+ * to Dirty Log File. This Dirty Log file is used later by
+ * ContainerCacheFlusher when the data is written to container
+ */
+public class BlockBufferFlushTask implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockBufferFlushTask.class);
+  private final CBlockLocalCache parentCache;
+  private final BlockBufferManager bufferManager;
+  private final ByteBuffer blockIDBuffer;
+
+  BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
+                       BlockBufferManager manager) {
+    this.parentCache = parentCache;
+    this.bufferManager = manager;
+    this.blockIDBuffer = blockIDBuffer;
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    try {
+      writeBlockBufferToFile(blockIDBuffer);
+    } catch (Exception e) {
+      parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
+      LOG.error("Unable to sync the Block map to disk with "
+          + (blockIDBuffer.position() / Long.SIZE) + "entries "
+          + "-- NOTE: This might cause a data loss or corruption", e);
+    } finally {
+      bufferManager.releaseBuffer(blockIDBuffer);
+    }
+  }
+
+  /**
+   * Write Block Buffer to file.
+   *
+   * @param buffer - ByteBuffer
+   * @throws IOException
+   */
+  private void writeBlockBufferToFile(ByteBuffer buffer)
+      throws IOException {
+    long startTime = Time.monotonicNow();
+    boolean append = false;
+
+    // If there is nothing written to blockId buffer,
+    // then skip flushing of blockId buffer
+    if (buffer.position() == 0) {
+      return;
+    }
+
+    buffer.flip();
+    String fileName =
+        String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
+                      Time.monotonicNow());
+    String log = Paths.get(parentCache.getDbPath().toString(), fileName)
+        .toString();
+
+    FileChannel channel = new FileOutputStream(log, append).getChannel();
+    int bytesWritten = channel.write(buffer);
+    channel.close();
+    buffer.clear();
+    parentCache.processDirtyMessage(fileName);
+    long endTime = Time.monotonicNow();
+    if (parentCache.isTraceEnabled()) {
+      parentCache.getTracer().info(
+          "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
+          endTime - startTime, bytesWritten);
+    }
+
+    parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
+    parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
+    parentCache.getTargetMetrics().
+        updateBlockBufferFlushLatency(endTime - startTime);
+    LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
+        bytesWritten, endTime - startTime);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
new file mode 100644
index 0000000..5d3209c
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_KEEP_ALIVE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_THREAD_PRIORITY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
+
+/**
+ * This class manages the block ID buffer.
+ * Block ID Buffer keeps a list of blocks which are in leveldb cache
+ * This buffer is used later when the blocks are flushed to container
+ *
+ * Two blockIDBuffers are maintained so that write are not blocked when
+ * DirtyLog is being written. Once a blockIDBuffer is full, it will be
+ * enqueued for DirtyLog write while the other buffer accepts new write.
+ * Once the DirtyLog write is done, the buffer is returned back to the pool.
+ *
+ * There are three triggers for blockIDBuffer flush
+ * 1) BlockIDBuffer is full,
+ * 2) Time period defined for blockIDBuffer flush has elapsed.
+ * 3) Shutdown
+ */
+public class BlockBufferManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockBufferManager.class);
+
+  private enum FlushReason {
+    BUFFER_FULL,
+    SHUTDOWN,
+    TIMER
+  };
+
+  private final int blockBufferSize;
+  private final CBlockLocalCache parentCache;
+  private final ScheduledThreadPoolExecutor scheduledExecutor;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  private final long intervalSeconds;
+  private final ArrayBlockingQueue<ByteBuffer> acquireQueue;
+  private final ArrayBlockingQueue<Runnable> workQueue;
+  private ByteBuffer currentBuffer;
+
+  BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
+    this.parentCache = parentCache;
+    this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+
+    this.intervalSeconds =
+        config.getTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL,
+            DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT,
+            TimeUnit.SECONDS);
+
+    long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE,
+        DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT,
+        TimeUnit.SECONDS);
+    this.workQueue = new ArrayBlockingQueue<>(2, true);
+    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
+        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
+    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Cache Block Buffer Manager Thread #%d")
+        .setDaemon(true)
+        .setPriority(threadPri)
+        .build();
+    /*
+     * starting a thread pool with core pool size of 1 and maximum of 2 threads
+     * as there are maximum of 2 buffers which can be flushed at the same time.
+     */
+    this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
+        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
+        new ThreadPoolExecutor.AbortPolicy());
+
+    this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
+    this.acquireQueue = new ArrayBlockingQueue<>(2, true);
+
+    for (int i = 0; i < 2; i++) {
+      acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
+    }
+    // get the first buffer to be used
+    this.currentBuffer = acquireQueue.remove();
+
+    LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
+        blockBufferSize, intervalSeconds);
+  }
+
+  // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
+  // This enqueue is asynchronous and hence triggerBlockBufferFlush will
+  // only block when there are no available buffers in acquireQueue
+  // Once the DirtyLog write is done, buffer is returned back to
+  // BlockBufferManager using releaseBuffer
+  private synchronized void triggerBlockBufferFlush(FlushReason reason) {
+    LOG.debug("Flush triggered because: " + reason.toString() +
+        " Num entries in buffer: " +
+        currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
+        " Acquire Queue Size: " + acquireQueue.size());
+
+    parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
+    BlockBufferFlushTask flushTask =
+        new BlockBufferFlushTask(currentBuffer, parentCache, this);
+    threadPoolExecutor.submit(flushTask);
+    try {
+      currentBuffer = acquireQueue.take();
+    } catch (InterruptedException ex) {
+      currentBuffer = null;
+      parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
+      LOG.error("wait on take operation on acquire queue interrupted", ex);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public synchronized void addToBlockBuffer(long blockId)  {
+    parentCache.getTargetMetrics().incNumBlockBufferUpdates();
+    currentBuffer.putLong(blockId);
+    // if no space left, flush this buffer
+    if (currentBuffer.remaining() == 0) {
+      triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
+    }
+  }
+
+  public void releaseBuffer(ByteBuffer buffer) {
+    if (buffer.position() != 0) {
+      LOG.error("requeuing a non empty buffer with:{}",
+          "elements enqueued in the acquire queue",
+          buffer.position() / (Long.SIZE / Byte.SIZE));
+      buffer.reset();
+    }
+    // There should always be space in the queue to add an element
+    acquireQueue.add(buffer);
+  }
+
+  // Start a scheduled task to flush blockIDBuffer
+  public void start() {
+    Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
+    scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
+                                        intervalSeconds, TimeUnit.SECONDS);
+    threadPoolExecutor.prestartAllCoreThreads();
+  }
+
+  public void shutdown() {
+    triggerBlockBufferFlush(FlushReason.SHUTDOWN);
+    scheduledExecutor.shutdown();
+    threadPoolExecutor.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
new file mode 100644
index 0000000..1149164
--- /dev/null
+++ 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO_DEFAULT;
+
+/**
+ * A local cache used by the CBlock ISCSI server. This class is enabled or
+ * disabled via config settings.
+ */
+public class CBlockLocalCache implements CacheModule {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CBlockLocalCache.class);
+  private static final Logger TRACER =
+      LoggerFactory.getLogger("TraceIO");
+
+  private final Configuration conf;
+  /**
+   * LevelDB cache file.
+   */
+  private final LevelDBStore cacheDB;
+
+  /**
+   * AsyncBlock writer updates the cacheDB and writes the blocks async to
+   * remote containers.
+   */
+  private final AsyncBlockWriter blockWriter;
+
+  /**
+   * Sync block reader tries to read from the cache and if we get a cache
+   * miss we will fetch the block from remote location. It will asynchronously
+   * update the cacheDB.
+   */
+  private final SyncBlockReader blockReader;
+  private final String userName;
+  private final String volumeName;
+
+  /**
+   * From a block ID we are able to get the pipeline by indexing this array.
+   */
+  private final Pipeline[] containerList;
+  private final int blockSize;
+  private XceiverClientManager clientManager;
+  /**
+   * If this flag is enabled then cache traces all I/O, all reads and writes
+   * are visible in the log with sha of the block written. Makes the system
+   * slower use it only for debugging or creating trace simulations.
+   */
+  private final boolean traceEnabled;
+  private final boolean enableShortCircuitIO;
+  private final long volumeSize;
+  private long currentCacheSize;
+  private File dbPath;
+  private final ContainerCacheFlusher flusher;
+  private CBlockTargetMetrics cblockTargetMetrics;
+
+  /**
+   * Get Db Path.
+   * @return the file instance of the db.
+   */
+  public File getDbPath() {
+    return dbPath;
+  }
+
+  /**
+   * Constructor for CBlockLocalCache invoked via the builder.
+   *
+   * @param conf -  Configuration
+   * @param volumeName - volume Name
+   * @param userName - user name
+   * @param containerPipelines - Pipelines that make up this contianer
+   * @param blockSize - blockSize
+   * @param flusher - flusher to flush data to container
+   * @throws IOException
+   */
+  CBlockLocalCache(
+      Configuration conf, String volumeName,
+      String userName, List<Pipeline> containerPipelines, int blockSize,
+      long volumeSize, ContainerCacheFlusher flusher) throws IOException {
+    this.conf = conf;
+    this.userName = userName;
+    this.volumeName = volumeName;
+    this.blockSize = blockSize;
+    this.flusher = flusher;
+    this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
+        DFS_CBLOCK_TRACE_IO_DEFAULT);
+    this.enableShortCircuitIO = conf.getBoolean(
+        DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO,
+        DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT);
+    dbPath = Paths.get(conf.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+        DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT), userName, volumeName).toFile();
+
+    if (!dbPath.exists() && !dbPath.mkdirs()) {
+      LOG.error("Unable to create the cache paths. Path: {}", dbPath);
+      throw new IllegalArgumentException("Unable to create paths. Path: " +
+          dbPath);
+    }
+    cacheDB = flusher.getCacheDB(dbPath.toString());
+    this.containerList = containerPipelines.toArray(new
+        Pipeline[containerPipelines.size()]);
+    this.volumeSize = volumeSize;
+
+    blockWriter = new AsyncBlockWriter(conf, this);
+    blockReader = new SyncBlockReader(conf, this);
+    if (this.traceEnabled) {
+      getTracer().info("Task=StartingCache");
+    }
+  }
+
+  private void setClientManager(XceiverClientManager manager) {
+    this.clientManager = manager;
+  }
+
+  private void setCblockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+    this.cblockTargetMetrics = targetMetrics;
+  }
+
+  /**
+   * Returns new builder class that builds a CBlockLocalCache.
+   *
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public void processDirtyMessage(String fileName) {
+    flusher.processDirtyBlocks(dbPath.toString(), fileName);
+  }
+
+  /**
+   * Get usable disk space.
+   *
+   * @param dbPathString - Path to db
+   * @return long bytes remaining.
+   */
+  private static long getRemainingDiskSpace(String dbPathString) {
+    try {
+      URI fileUri = new URI("file:///");
+      Path dbPath = Paths.get(fileUri).resolve(dbPathString);
+      FileStore disk = Files.getFileStore(dbPath);
+      return disk.getUsableSpace();
+    } catch (URISyntaxException | IOException ex) {
+      LOG.error("Unable to get free space on for path :" + dbPathString);
+    }
+    return 0L;
+  }
+
+  /**
+   * Returns the Max current CacheSize.
+   *
+   * @return - Cache Size
+   */
+  public long getCurrentCacheSize() {
+    return currentCacheSize;
+  }
+
+  /**
+   * Sets the Maximum Cache Size.
+   *
+   * @param currentCacheSize - Max current Cache Size.
+   */
+  public void setCurrentCacheSize(long currentCacheSize) {
+    this.currentCacheSize = currentCacheSize;
+  }
+
+  /**
+   * True if block tracing is enabled.
+   *
+   * @return - bool
+   */
+  public boolean isTraceEnabled() {
+    return traceEnabled;
+  }
+
+  /**
+   * Checks if Short Circuit I/O is enabled.
+   *
+   * @return - true if it is enabled.
+   */
+  public boolean isShortCircuitIOEnabled() {
+    return enableShortCircuitIO;
+  }
+
+  /**
+   * Returns the default block size of this device.
+   *
+   * @return - int
+   */
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  /**
+   * Gets the client manager.
+   *
+   * @return XceiverClientManager
+   */
+  public XceiverClientManager getClientManager() {
+    return clientManager;
+  }
+
+  /**
+   * check if the key is cached, if yes, returned the cached object.
+   * otherwise, load from data source. Then put it into cache.
+   *
+   * @param blockID
+   * @return the block associated to the blockID
+   */
+  @Override
+  public LogicalBlock get(long blockID) throws IOException {
+    cblockTargetMetrics.incNumReadOps();
+    return blockReader.readBlock(blockID);
+  }
+
+  /**
+   * put the value of the key into cache and remote container.
+   *
+   * @param blockID - BlockID
+   * @param data - byte[]
+   */
+  @Override
+  public void put(long blockID, byte[] data) throws IOException {
+    cblockTargetMetrics.incNumWriteOps();
+    LogicalBlock block = new DiskBlock(blockID, data, false);
+    blockWriter.writeBlock(block);
+  }
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void start() throws IOException {
+    flusher.register(getDbPath().getPath(), containerList);
+    blockWriter.start();
+  }
+
+  @Override
+  public void stop() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+    blockReader.shutdown();
+    blockWriter.shutdown();
+    this.flusher.releaseCacheDB(dbPath.toString());
+    if (this.traceEnabled) {
+      getTracer().info("Task=ShutdownCache");
+    }
+  }
+
+  /**
+   * Returns true if cache still has blocks pending to write.
+   *
+   * @return false if we have no pending blocks to write.
+   */
+  @Override
+  public boolean isDirtyCache() {
+    return false;
+  }
+
+  /**
+   * Returns the local cache DB.
+   *
+   * @return - DB
+   */
+  LevelDBStore getCacheDB() {
+    return this.cacheDB;
+  }
+
+  /**
+   * Returns the current userName.
+   *
+   * @return - UserName
+   */
+  String getUserName() {
+    return this.userName;
+  }
+
+  /**
+   * Returns the volume name.
+   *
+   * @return VolumeName.
+   */
+  String getVolumeName() {
+    return this.volumeName;
+  }
+
+  /**
+   * Returns the target metrics.
+   *
+   * @return CBlock Target Metrics.
+   */
+  CBlockTargetMetrics getTargetMetrics() {
+    return this.cblockTargetMetrics;
+  }
+
+  /**
+   * Returns the pipeline to use given a container.
+   *
+   * @param blockId - blockID
+   * @return - pipeline.
+   */
+  Pipeline getPipeline(long blockId) {
+    int containerIdx = (int) blockId % containerList.length;
+    long cBlockIndex =
+        Longs.fromByteArray(containerList[containerIdx].getData());
+    if (cBlockIndex > 0) {
+      // This catches the case when we get a wrong container in the ordering
+      // of the containers.
+      Preconditions.checkState(containerIdx % cBlockIndex == 0,
+          "The container ID computed should match with the container index " +
+              "returned from cBlock Server.");
+    }
+    return containerList[containerIdx];
+  }
+
+  String getTraceID(long blockID) {
+    return flusher.getTraceID(dbPath, blockID);
+  }
+
+  /**
+   * Returns tracer.
+   *
+   * @return - Logger
+   */
+  Logger getTracer() {
+    return TRACER;
+  }
+
+  /**
+   * Builder class for CBlocklocalCache.
+   */
+  public static class Builder {
+    private Configuration configuration;
+    private String userName;
+    private String volumeName;
+    private List<Pipeline> pipelines;
+    private XceiverClientManager clientManager;
+    private int blockSize;
+    private long volumeSize;
+    private ContainerCacheFlusher flusher;
+    private CBlockTargetMetrics metrics;
+
+    /**
+     * Ctor.
+     */
+    Builder() {
+    }
+
+    /**
+     * Computes a cache size based on the configuration and available disk
+     * space.
+     *
+     * @param configuration - Config
+     * @param volumeSize - Size of Volume
+     * @param blockSize - Size of the block
+     * @return - cache size in bytes.
+     */
+    private static long computeCacheSize(Configuration configuration,
+        long volumeSize, int blockSize) {
+      long cacheSize = 0;
+      String dbPath = configuration.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+          DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT);
+      if (StringUtils.isBlank(dbPath)) {
+        return cacheSize;
+      }
+      long spaceRemaining = getRemainingDiskSpace(dbPath);
+      double cacheRatio = 1.0;
+
+      if (spaceRemaining < volumeSize) {
+        cacheRatio = (double)spaceRemaining / volumeSize;
+      }
+
+      // if cache is going to be at least 10% of the volume size it is worth
+      // doing, otherwise skip creating the  cache.
+      if (cacheRatio >= 0.10) {
+        cacheSize = Double.doubleToLongBits(volumeSize * cacheRatio);
+      }
+      return cacheSize;
+    }
+
+    /**
+     * Sets the Config to be used by this cache.
+     *
+     * @param conf - Config
+     * @return Builder
+     */
+    public Builder setConfiguration(Configuration conf) {
+      this.configuration = conf;
+      return this;
+    }
+
+    /**
+     * Sets the user name who is the owner of this volume.
+     *
+     * @param user - name of the owner, please note this is not the current
+     * user name.
+     * @return - Builder
+     */
+    public Builder setUserName(String user) {
+      this.userName = user;
+      return this;
+    }
+
+    /**
+     * Sets the VolumeName.
+     *
+     * @param volume - Name of the volume
+     * @return Builder
+     */
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    /**
+     * Sets the Pipelines that form this volume.
+     *
+     * @param pipelineList - list of pipelines
+     * @return Builder
+     */
+    public Builder setPipelines(List<Pipeline> pipelineList) {
+      this.pipelines = pipelineList;
+      return this;
+    }
+
+    /**
+     * Sets the Client Manager that manages the communication with containers.
+     *
+     * @param xceiverClientManager - clientManager.
+     * @return - Builder
+     */
+    public Builder setClientManager(XceiverClientManager xceiverClientManager) 
{
+      this.clientManager = xceiverClientManager;
+      return this;
+    }
+
+    /**
+     * Sets the block size -- Typical sizes are 4KB, 8KB etc.
+     *
+     * @param size - BlockSize.
+     * @return - Builder
+     */
+    public Builder setBlockSize(int size) {
+      this.blockSize = size;
+      return this;
+    }
+
+    /**
+     * Sets the volumeSize.
+     *
+     * @param size - VolumeSize
+     * @return - Builder
+     */
+    public Builder setVolumeSize(long size) {
+      this.volumeSize = size;
+      return this;
+    }
+
+    /**
+     * Set flusher.
+     * @param containerCacheFlusher - cache Flusher
+     * @return Builder.
+     */
+    public Builder setFlusher(ContainerCacheFlusher containerCacheFlusher) {
+      this.flusher = containerCacheFlusher;
+      return this;
+    }
+
+    /**
+     * Sets the cblock Metrics.
+     *
+     * @param targetMetrics - CBlock Target Metrics
+     * @return - Builder
+     */
+    public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+      this.metrics = targetMetrics;
+      return this;
+    }
+
+    /**
+     * Constructs a CBlockLocalCache.
+     *
+     * @return the CBlockLocalCache with the preset properties.
+     * @throws IOException
+     */
+    public CBlockLocalCache build() throws IOException {
+      Preconditions.checkNotNull(this.configuration, "A valid configuration " +
+          "is needed");
+      Preconditions.checkState(StringUtils.isNotBlank(userName), "A valid " +
+          "username is needed");
+      Preconditions.checkState(StringUtils.isNotBlank(volumeName), " A valid" +
+          " volume name is needed");
+      Preconditions.checkNotNull(this.pipelines, "Pipelines cannot be null");
+      Preconditions.checkState(this.pipelines.size() > 0, "At least one " +
+          "pipeline location is needed for a volume");
+
+      for (int x = 0; x < pipelines.size(); x++) {
+        Preconditions.checkNotNull(pipelines.get(x).getData(), "cBlock " +
+            "relies on private data on the pipeline, null data found.");
+      }
+
+      Preconditions.checkNotNull(clientManager, "Client Manager cannot be " +
+          "null");
+      Preconditions.checkState(blockSize > 0, " Block size has to be a " +
+          "number greater than 0");
+
+      Preconditions.checkState(volumeSize > 0, "Volume Size cannot be less " +
+          "than 1");
+      Preconditions.checkNotNull(this.flusher, "Flusher cannot be null.");
+
+      CBlockLocalCache cache = new CBlockLocalCache(this.configuration,
+          this.volumeName, this.userName, this.pipelines, blockSize,
+          volumeSize, flusher);
+      cache.setCblockTargetMetrics(this.metrics);
+      cache.setClientManager(this.clientManager);
+
+      // TODO : Support user configurable maximum size.
+      long cacheSize = computeCacheSize(this.configuration, this.volumeSize,
+          this.blockSize);
+      cache.setCurrentCacheSize(cacheSize);
+      return cache;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to