http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
deleted file mode 100644
index 62e328e..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkResponseProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.StandardOpenOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
-
-/**
- * Utility methods for chunk operations for KeyValue container.
- */
-public final class ChunkUtils {
-
-  /** Never constructed. **/
-  private ChunkUtils() {
-
-  }
-
-  /**
-   * Writes the data in chunk Info to the specified location in the chunkfile.
-   *
-   * @param chunkFile - File to write data to.
-   * @param chunkInfo - Data stream to write.
-   * @param data - The data buffer.
-   * @param volumeIOStats
-   * @throws StorageContainerException
-   */
-  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
-                               byte[] data, VolumeIOStats volumeIOStats) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data.length != chunkInfo.getLen()) {
-      String err = String.format("data array does not match the length " +
-              "specified. DataLen: %d Byte Array: %d",
-          chunkInfo.getLen(), data.length);
-      log.error(err);
-      throw new StorageContainerException(err, INVALID_WRITE_SIZE);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-
-    try {
-      if (chunkInfo.getChecksum() != null &&
-          !chunkInfo.getChecksum().isEmpty()) {
-        verifyChecksum(chunkInfo, data, log);
-      }
-
-      long writeTimeStart = Time.monotonicNow();
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.CREATE,
-              StandardOpenOption.WRITE,
-              StandardOpenOption.SPARSE,
-              StandardOpenOption.SYNC);
-      lock = file.lock().get();
-      int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();
-      // Increment volumeIO stats here.
-      volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
-      volumeIOStats.incWriteOpCount();
-      volumeIOStats.incWriteBytes(size);
-      if (size != data.length) {
-        log.error("Invalid write size found. Size:{}  Expected: {} ", size,
-            data.length);
-        throw new StorageContainerException("Invalid write size found. " +
-            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
-      }
-    } catch (StorageContainerException ex) {
-      throw ex;
-    } catch(IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("Unable to release lock ??, Fatal Error.");
-          throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
-
-        }
-      }
-      if (file != null) {
-        try {
-          file.close();
-        } catch (IOException e) {
-          throw new StorageContainerException("Error closing chunk file",
-              e, CONTAINER_INTERNAL_ERROR);
-        }
-      }
-    }
-  }
-
-  /**
-   * Reads data from an existing chunk file.
-   *
-   * @param chunkFile - file where data lives.
-   * @param data - chunk definition.
-   * @param volumeIOStats
-   * @return ByteBuffer
-   * @throws StorageContainerException
-   * @throws ExecutionException
-   * @throws InterruptedException
-   */
-  public static ByteBuffer readData(File chunkFile, ChunkInfo data,
-                                    VolumeIOStats volumeIOStats)
-      throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    if (!chunkFile.exists()) {
-      log.error("Unable to find the chunk file. chunk info : {}",
-          data.toString());
-      throw new StorageContainerException("Unable to find the chunk file. " +
-          "chunk info " +
-          data.toString(), UNABLE_TO_FIND_CHUNK);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-    try {
-      long readStartTime = Time.monotonicNow();
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.READ);
-      lock = file.lock(data.getOffset(), data.getLen(), true).get();
-
-      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
-      file.read(buf, data.getOffset()).get();
-
-      // Increment volumeIO stats here.
-      volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
-      volumeIOStats.incReadOpCount();
-      volumeIOStats.incReadBytes(data.getLen());
-      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
-        verifyChecksum(data, buf.array(), log);
-      }
-      return buf;
-    } catch (IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("I/O error is lock release.");
-        }
-      }
-      if (file != null) {
-        IOUtils.closeStream(file);
-      }
-    }
-  }
-
-  /**
-   * Verifies the checksum of a chunk against the data buffer.
-   *
-   * @param chunkInfo - Chunk Info.
-   * @param data - data buffer
-   * @param log - log
-   * @throws NoSuchAlgorithmException
-   * @throws StorageContainerException
-   */
-  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
-      log) throws NoSuchAlgorithmException, StorageContainerException {
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha.update(data);
-    if (!Hex.encodeHexString(sha.digest()).equals(
-        chunkInfo.getChecksum())) {
-      log.error("Checksum mismatch. Provided: {} , computed: {}",
-          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
-      throw new StorageContainerException("Checksum mismatch. Provided: " +
-          chunkInfo.getChecksum() + " , computed: " +
-          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
-    }
-  }
-
-  /**
-   * Validates chunk data and returns a file object to Chunk File that we are
-   * expected to write data to.
-   *
-   * @param chunkFile - chunkFile to write data into.
-   * @param info - chunk info.
-   * @return boolean isOverwrite
-   * @throws StorageContainerException
-   */
-  public static boolean validateChunkForOverwrite(File chunkFile,
-      ChunkInfo info) throws StorageContainerException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    if (isOverWriteRequested(chunkFile, info)) {
-      if (!isOverWritePermitted(info)) {
-        log.error("Rejecting write chunk request. Chunk overwrite " +
-            "without explicit request. {}", info.toString());
-        throw new StorageContainerException("Rejecting write chunk request. " +
-            "OverWrite flag required." + info.toString(),
-            OVERWRITE_FLAG_REQUIRED);
-      }
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Validates that Path to chunk file exists.
-   *
-   * @param containerData - Container Data
-   * @param info - Chunk info
-   * @return - File.
-   * @throws StorageContainerException
-   */
-  public static File getChunkFile(KeyValueContainerData containerData,
-                                  ChunkInfo info) throws
-      StorageContainerException {
-
-    Preconditions.checkNotNull(containerData, "Container data can't be null");
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    String chunksPath = containerData.getChunksPath();
-    if (chunksPath == null) {
-      log.error("Chunks path is null in the container data");
-      throw new StorageContainerException("Unable to get Chunks directory.",
-          UNABLE_TO_FIND_DATA_DIR);
-    }
-    File chunksLoc = new File(chunksPath);
-    if (!chunksLoc.exists()) {
-      log.error("Chunks path does not exist");
-      throw new StorageContainerException("Unable to get Chunks directory.",
-          UNABLE_TO_FIND_DATA_DIR);
-    }
-
-    return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
-  }
-
-  /**
-   * Checks if we are getting a request to overwrite an existing range of
-   * chunk.
-   *
-   * @param chunkFile - File
-   * @param chunkInfo - Buffer to write
-   * @return bool
-   */
-  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
-      chunkInfo) {
-
-    if (!chunkFile.exists()) {
-      return false;
-    }
-
-    long offset = chunkInfo.getOffset();
-    return offset < chunkFile.length();
-  }
-
-  /**
-   * Overwrite is permitted if an only if the user explicitly asks for it. We
-   * permit this iff the key/value pair contains a flag called
-   * [OverWriteRequested, true].
-   *
-   * @param chunkInfo - Chunk info
-   * @return true if the user asks for it.
-   */
-  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
-    String overWrite = 
chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
-    return (overWrite != null) &&
-        (!overWrite.isEmpty()) &&
-        (Boolean.valueOf(overWrite));
-  }
-
-  /**
-   * Returns a CreateContainer Response. This call is used by create and delete
-   * containers which have null success responses.
-   *
-   * @param msg Request
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto getChunkResponseSuccess(
-      ContainerCommandRequestProto msg) {
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-  /**
-   * Gets a response to the read chunk calls.
-   *
-   * @param msg - Msg
-   * @param data - Data
-   * @param info - Info
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto getReadChunkResponse(
-      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
-    Preconditions.checkNotNull(msg);
-    Preconditions.checkNotNull(data, "Chunk data is null");
-    Preconditions.checkNotNull(info, "Chunk Info is null");
-
-    ReadChunkResponseProto.Builder response =
-        ReadChunkResponseProto.newBuilder();
-    response.setChunkData(info.getProtoBufMessage());
-    response.setData(ByteString.copyFrom(data));
-    response.setBlockID(msg.getReadChunk().getBlockID());
-
-    ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setReadChunk(response);
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
deleted file mode 100644
index 0a81ed8..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.Storage;
-
-import java.io.File;
-
-/**
- * Class which provides utility methods for container locations.
- */
-public final class KeyValueContainerLocationUtil {
-
-  /* Never constructed. */
-  private KeyValueContainerLocationUtil() {
-
-  }
-  /**
-   * Returns Container Metadata Location.
-   * @param hddsVolumeDir base dir of the hdds volume where scm directories
-   *                      are stored
-   * @param scmId
-   * @param containerId
-   * @return containerMetadata Path to container metadata location where
-   * .container file will be stored.
-   */
-  public static File getContainerMetaDataPath(String hddsVolumeDir, String 
scmId,
-                                              long containerId) {
-    String containerMetaDataPath = getBaseContainerLocation(hddsVolumeDir, 
scmId,
-        containerId);
-    containerMetaDataPath = containerMetaDataPath + File.separator +
-        OzoneConsts.CONTAINER_META_PATH;
-    return new File(containerMetaDataPath);
-  }
-
-
-  /**
-   * Returns Container Chunks Location.
-   * @param baseDir
-   * @param scmId
-   * @param containerId
-   * @return chunksPath
-   */
-  public static File getChunksLocationPath(String baseDir, String scmId,
-                                           long containerId) {
-    String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
-        + File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
-    return new File(chunksPath);
-  }
-
-  /**
-   * Returns base directory for specified container.
-   * @param hddsVolumeDir
-   * @param scmId
-   * @param containerId
-   * @return base directory for container.
-   */
-  private static String getBaseContainerLocation(String hddsVolumeDir, String 
scmId,
-                                        long containerId) {
-    Preconditions.checkNotNull(hddsVolumeDir, "Base Directory cannot be null");
-    Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
-    Preconditions.checkState(containerId >= 0,
-        "Container Id cannot be negative.");
-
-    String containerSubDirectory = getContainerSubDirectory(containerId);
-
-    String containerMetaDataPath = hddsVolumeDir  + File.separator + scmId +
-        File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
-        containerSubDirectory + File.separator + containerId;
-
-    return containerMetaDataPath;
-  }
-
-  /**
-   * Returns subdirectory, where this container needs to be placed.
-   * @param containerId
-   * @return container sub directory
-   */
-  private static String getContainerSubDirectory(long containerId){
-    int directory = (int) ((containerId >> 9) & 0xFF);
-    return Storage.CONTAINER_DIR + directory;
-  }
-
-  /**
-   * Return containerDB File.
-   */
-  public static File getContainerDBFile(File containerMetaDataPath,
-      long containerID) {
-    return new File(containerMetaDataPath, containerID + OzoneConsts
-        .DN_CONTAINER_DB);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
deleted file mode 100644
index 4f2b3a2..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class which defines utility methods for KeyValueContainer.
- */
-
-public final class KeyValueContainerUtil {
-
-  /* Never constructed. */
-  private KeyValueContainerUtil() {
-
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      KeyValueContainerUtil.class);
-
-  /**
-   * creates metadata path, chunks path and  metadata DB for the specified
-   * container.
-   *
-   * @param containerMetaDataPath
-   * @throws IOException
-   */
-  public static void createContainerMetaData(File containerMetaDataPath, File
-      chunksPath, File dbFile, Configuration conf) throws IOException {
-    Preconditions.checkNotNull(containerMetaDataPath);
-    Preconditions.checkNotNull(conf);
-
-    if (!containerMetaDataPath.mkdirs()) {
-      LOG.error("Unable to create directory for metadata storage. Path: {}",
-          containerMetaDataPath);
-      throw new IOException("Unable to create directory for metadata storage." 
+
-          " Path: " + containerMetaDataPath);
-    }
-    MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
-        .setCreateIfMissing(true).setDbFile(dbFile).build();
-
-    // we close since the SCM pre-creates containers.
-    // we will open and put Db handle into a cache when keys are being created
-    // in a container.
-
-    store.close();
-
-    if (!chunksPath.mkdirs()) {
-      LOG.error("Unable to create chunks directory Container {}",
-          chunksPath);
-      //clean up container metadata path and metadata db
-      FileUtils.deleteDirectory(containerMetaDataPath);
-      FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
-      throw new IOException("Unable to create directory for data storage." +
-          " Path: " + chunksPath);
-    }
-  }
-
-  /**
-   * remove Container if it is empty.
-   * <p/>
-   * There are three things we need to delete.
-   * <p/>
-   * 1. Container file and metadata file. 2. The Level DB file 3. The path that
-   * we created on the data location.
-   *
-   * @param containerData - Data of the container to remove.
-   * @param conf - configuration of the cluster.
-   * @param forceDelete - whether this container should be deleted forcibly.
-   * @throws IOException
-   */
-  public static void removeContainer(KeyValueContainerData containerData,
-                                     Configuration conf, boolean forceDelete)
-      throws IOException {
-    Preconditions.checkNotNull(containerData);
-    File containerMetaDataPath = new File(containerData
-        .getMetadataPath());
-    File chunksPath = new File(containerData.getChunksPath());
-
-    // Close the DB connection and remove the DB handler from cache
-    BlockUtils.removeDB(containerData, conf);
-
-    // Delete the Container MetaData path.
-    FileUtils.deleteDirectory(containerMetaDataPath);
-
-    //Delete the Container Chunks Path.
-    FileUtils.deleteDirectory(chunksPath);
-
-    //Delete Container directory
-    FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
-  }
-
-  /**
-   * Returns a ReadContainer Response.
-   *
-   * @param request Request
-   * @param containerData - data
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto getReadContainerResponse(
-      ContainerCommandRequestProto request,
-      KeyValueContainerData containerData) {
-    Preconditions.checkNotNull(containerData);
-
-    ContainerProtos.ReadContainerResponseProto.Builder response =
-        ContainerProtos.ReadContainerResponseProto.newBuilder();
-    response.setContainerData(containerData.getProtoBufMessage());
-
-    ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(request);
-    builder.setReadContainer(response);
-    return builder.build();
-  }
-
-  /**
-   * Parse KeyValueContainerData and verify checksum.
-   * @param kvContainerData
-   * @param config
-   * @throws IOException
-   */
-  public static void parseKVContainerData(KeyValueContainerData 
kvContainerData,
-      Configuration config) throws IOException {
-
-    long containerID = kvContainerData.getContainerID();
-    File metadataPath = new File(kvContainerData.getMetadataPath());
-
-    // Verify Checksum
-    ContainerUtils.verifyChecksum(kvContainerData);
-
-    File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
-        metadataPath, containerID);
-    if (!dbFile.exists()) {
-      LOG.error("Container DB file is missing for ContainerID {}. " +
-          "Skipping loading of this container.", containerID);
-      // Don't further process this container, as it is missing db file.
-      return;
-    }
-    kvContainerData.setDbFile(dbFile);
-
-    MetadataStore metadata = BlockUtils.getDB(kvContainerData, config);
-    long bytesUsed = 0;
-    List<Map.Entry<byte[], byte[]>> liveKeys = metadata
-        .getRangeKVs(null, Integer.MAX_VALUE,
-            MetadataKeyFilters.getNormalKeyFilter());
-    bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
-      BlockData blockData;
-      try {
-        blockData = BlockUtils.getBlockData(e.getValue());
-        return blockData.getSize();
-      } catch (IOException ex) {
-        return 0L;
-      }
-    }).sum();
-    kvContainerData.setBytesUsed(bytesUsed);
-    kvContainerData.setKeyCount(liveKeys.size());
-  }
-
-  /**
-   * Returns the path where data or chunks live for a given container.
-   *
-   * @param kvContainerData - KeyValueContainerData
-   * @return - Path to the chunks directory
-   */
-  public static Path getDataDirectory(KeyValueContainerData kvContainerData) {
-
-    String chunksPath = kvContainerData.getChunksPath();
-    Preconditions.checkNotNull(chunksPath);
-
-    return Paths.get(chunksPath);
-  }
-
-  /**
-   * Container metadata directory -- here is where the level DB and
-   * .container file lives.
-   *
-   * @param kvContainerData - KeyValueContainerData
-   * @return Path to the metadata directory
-   */
-  public static Path getMetadataDirectory(
-      KeyValueContainerData kvContainerData) {
-
-    String metadataPath = kvContainerData.getMetadataPath();
-    Preconditions.checkNotNull(metadataPath);
-
-    return Paths.get(metadataPath);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
deleted file mode 100644
index 3495363..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-
-/**
- * File Utils are helper routines used by putSmallFile and getSmallFile
- * RPCs.
- */
-public final class SmallFileUtils {
-  /**
-   * Never Constructed.
-   */
-  private SmallFileUtils() {
-  }
-
-  /**
-   * Gets a response for the putSmallFile RPC.
-   * @param msg - ContainerCommandRequestProto
-   * @return - ContainerCommandResponseProto
-   */
-  public static ContainerCommandResponseProto getPutFileResponseSuccess(
-      ContainerCommandRequestProto msg) {
-    ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
-        ContainerProtos.PutSmallFileResponseProto.newBuilder();
-    ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setCmdType(ContainerProtos.Type.PutSmallFile);
-    builder.setPutSmallFile(getResponse);
-    return  builder.build();
-  }
-
-  /**
-   * Gets a response to the read small file call.
-   * @param msg - Msg
-   * @param data  - Data
-   * @param info  - Info
-   * @return    Response.
-   */
-  public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
-      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
-    Preconditions.checkNotNull(msg);
-
-    ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
-        ContainerProtos.ReadChunkResponseProto.newBuilder();
-    readChunkresponse.setChunkData(info.getProtoBufMessage());
-    readChunkresponse.setData(ByteString.copyFrom(data));
-    
readChunkresponse.setBlockID(msg.getGetSmallFile().getBlock().getBlockID());
-
-    ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
-        ContainerProtos.GetSmallFileResponseProto.newBuilder();
-    getSmallFile.setData(readChunkresponse.build());
-    ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setCmdType(ContainerProtos.Type.GetSmallFile);
-    builder.setGetSmallFile(getSmallFile);
-    return builder.build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/package-info.java
deleted file mode 100644
index 041f485..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-/**
- This package contains utility classes for KeyValue container type.
- **/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
deleted file mode 100644
index 54c15fb..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
-
-/**
- * This class is for performing block related operations on the KeyValue
- * Container.
- */
-public class BlockManagerImpl implements BlockManager {
-
-  static final Logger LOG = LoggerFactory.getLogger(BlockManagerImpl.class);
-
-  private Configuration config;
-
-  /**
-   * Constructs a Block Manager.
-   *
-   * @param conf - Ozone configuration
-   */
-  public BlockManagerImpl(Configuration conf) {
-    Preconditions.checkNotNull(conf, "Config cannot be null");
-    this.config = conf;
-  }
-
-  /**
-   * Puts or overwrites a block.
-   *
-   * @param container - Container for which block need to be added.
-   * @param data     - BlockData.
-   * @return length of the block.
-   * @throws IOException
-   */
-  public long putBlock(Container container, BlockData data) throws IOException 
{
-    Preconditions.checkNotNull(data, "BlockData cannot be null for put " +
-        "operation.");
-    Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
-        "cannot be negative");
-    // We are not locking the key manager since LevelDb serializes all actions
-    // against a single DB. We rely on DB level locking to avoid conflicts.
-    MetadataStore db = BlockUtils.getDB((KeyValueContainerData) container
-        .getContainerData(), config);
-
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
-        .toByteArray());
-
-    // Increment keycount here
-    container.getContainerData().incrKeyCount();
-    return data.getSize();
-  }
-
-  /**
-   * Gets an existing block.
-   *
-   * @param container - Container from which block need to be fetched.
-   * @param blockID - BlockID of the block.
-   * @return Key Data.
-   * @throws IOException
-   */
-  public BlockData getBlock(Container container, BlockID blockID)
-      throws IOException {
-    Preconditions.checkNotNull(blockID,
-        "BlockID cannot be null in GetBlock request");
-    Preconditions.checkNotNull(blockID.getContainerID(),
-        "Container name cannot be null");
-
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = BlockUtils.getDB(containerData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block.",
-          NO_SUCH_BLOCK);
-    }
-    ContainerProtos.BlockData blockData =
-        ContainerProtos.BlockData.parseFrom(kData);
-    return BlockData.getFromProtoBuf(blockData);
-  }
-
-  /**
-   * Returns the length of the committed block.
-   *
-   * @param container - Container from which block need to be fetched.
-   * @param blockID - BlockID of the block.
-   * @return length of the block.
-   * @throws IOException in case, the block key does not exist in db.
-   */
-  @Override
-  public long getCommittedBlockLength(Container container, BlockID blockID)
-      throws IOException {
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = BlockUtils.getDB(containerData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block.",
-          NO_SUCH_BLOCK);
-    }
-    ContainerProtos.BlockData blockData =
-        ContainerProtos.BlockData.parseFrom(kData);
-    return blockData.getSize();
-  }
-
-  /**
-   * Deletes an existing block.
-   *
-   * @param container - Container from which block need to be deleted.
-   * @param blockID - ID of the block.
-   * @throws StorageContainerException
-   */
-  public void deleteBlock(Container container, BlockID blockID) throws
-      IOException {
-    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
-    Preconditions.checkState(blockID.getContainerID() >= 0,
-        "Container ID cannot be negative.");
-    Preconditions.checkState(blockID.getLocalID() >= 0,
-        "Local ID cannot be negative.");
-
-    KeyValueContainerData cData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = BlockUtils.getDB(cData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    // Note : There is a race condition here, since get and delete
-    // are not atomic. Leaving it here since the impact is refusing
-    // to delete a Block which might have just gotten inserted after
-    // the get check.
-    byte[] kKey = Longs.toByteArray(blockID.getLocalID());
-    byte[] kData = db.get(kKey);
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block.",
-          NO_SUCH_BLOCK);
-    }
-    db.delete(kKey);
-
-    // Decrement blockcount here
-    container.getContainerData().decrKeyCount();
-  }
-
-  /**
-   * List blocks in a container.
-   *
-   * @param container - Container from which blocks need to be listed.
-   * @param startLocalID  - Key to start from, 0 to begin.
-   * @param count    - Number of blocks to return.
-   * @return List of Blocks that match the criteria.
-   */
-  @Override
-  public List<BlockData> listBlock(Container container, long startLocalID, int
-      count) throws IOException {
-    Preconditions.checkNotNull(container, "container cannot be null");
-    Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
-        "negative");
-    Preconditions.checkArgument(count > 0,
-        "Count must be a positive number.");
-    container.readLock();
-    List<BlockData> result = null;
-    KeyValueContainerData cData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = BlockUtils.getDB(cData, config);
-    result = new ArrayList<>();
-    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
-    List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
-        startKeyInBytes, count, null);
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      BlockData value = BlockUtils.getBlockData(entry.getValue());
-      BlockData data = new BlockData(value.getBlockID());
-      result.add(data);
-    }
-    return result;
-  }
-
-  /**
-   * Shutdown KeyValueContainerManager.
-   */
-  public void shutdown() {
-    BlockUtils.shutdownCache(ContainerCache.getInstance(config));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
deleted file mode 100644
index ce317bd..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
-
-/**
- * This class is for performing chunk related operations.
- */
-public class ChunkManagerImpl implements ChunkManager {
-  static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-  /**
-   * writes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block
-   * @param info - ChunkInfo
-   * @param data - data of the chunk
-   * @param stage - Stage of the Chunk operation
-   * @throws StorageContainerException
-   */
-  public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-      byte[] data, ContainerProtos.Stage stage)
-      throws StorageContainerException {
-
-    try {
-
-      KeyValueContainerData containerData = (KeyValueContainerData) container
-          .getContainerData();
-      HddsVolume volume = containerData.getVolume();
-      VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
-
-      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
-
-      boolean isOverwrite = ChunkUtils.validateChunkForOverwrite(
-          chunkFile, info);
-      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
-
-      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
-          info.getChunkName(), stage, chunkFile, tmpChunkFile);
-
-      switch (stage) {
-      case WRITE_DATA:
-        // Initially writes to temporary chunk file.
-        ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats);
-        // No need to increment container stats here, as still data is not
-        // committed here.
-        break;
-      case COMMIT_DATA:
-        // commit the data, means move chunk data from temporary chunk file
-        // to actual chunk file.
-        commitChunk(tmpChunkFile, chunkFile);
-        // Increment container stats here, as we commit the data.
-        containerData.incrBytesUsed(info.getLen());
-        containerData.incrWriteCount();
-        containerData.incrWriteBytes(info.getLen());
-        break;
-      case COMBINED:
-        // directly write to the chunk file
-        ChunkUtils.writeData(chunkFile, info, data, volumeIOStats);
-        if (!isOverwrite) {
-          containerData.incrBytesUsed(info.getLen());
-        }
-        containerData.incrWriteCount();
-        containerData.incrWriteBytes(info.getLen());
-        break;
-      default:
-        throw new IOException("Can not identify write operation.");
-      }
-    } catch (StorageContainerException ex) {
-      throw ex;
-    } catch (NoSuchAlgorithmException ex) {
-      LOG.error("write data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ", ex,
-          NO_SUCH_ALGORITHM);
-    } catch (ExecutionException  | IOException ex) {
-      LOG.error("write data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ", ex,
-          CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    }
-  }
-
-  /**
-   * reads the data defined by a chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @return byte array
-   * @throws StorageContainerException
-   * TODO: Right now we do not support partial reads and writes of chunks.
-   * TODO: Explore if we need to do that for ozone.
-   */
-  public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    try {
-      KeyValueContainerData containerData = (KeyValueContainerData) container
-          .getContainerData();
-      ByteBuffer data;
-      HddsVolume volume = containerData.getVolume();
-      VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
-
-      // Checking here, which layout version the container is, and reading
-      // the chunk file in that format.
-      // In version1, we verify checksum if it is available and return data
-      // of the chunk file.
-      if (containerData.getLayOutVersion() == ChunkLayOutVersion
-          .getLatestVersion().getVersion()) {
-        File chunkFile = ChunkUtils.getChunkFile(containerData, info);
-        data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
-        containerData.incrReadCount();
-        long length = chunkFile.length();
-        containerData.incrReadBytes(length);
-        return data.array();
-      }
-    } catch(NoSuchAlgorithmException ex) {
-      LOG.error("read data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ",
-          ex, NO_SUCH_ALGORITHM);
-    } catch (ExecutionException ex) {
-      LOG.error("read data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ",
-          ex, CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    }
-    return null;
-  }
-
-  /**
-   * Deletes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block
-   * @param info - Chunk Info
-   * @throws StorageContainerException
-   */
-  public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
-    // Checking here, which layout version the container is, and performing
-    // deleting chunk operation.
-    // In version1, we have only chunk file.
-    if (containerData.getLayOutVersion() == ChunkLayOutVersion
-        .getLatestVersion().getVersion()) {
-      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
-      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
-        FileUtil.fullyDelete(chunkFile);
-        containerData.decrBytesUsed(chunkFile.length());
-      } else {
-        LOG.error("Not Supported Operation. Trying to delete a " +
-            "chunk that is in shared file. chunk info : " + info.toString());
-        throw new StorageContainerException("Not Supported Operation. " +
-            "Trying to delete a chunk that is in shared file. chunk info : "
-            + info.toString(), UNSUPPORTED_REQUEST);
-      }
-    }
-  }
-
-  /**
-   * Shutdown the chunkManager.
-   *
-   * In the chunkManager we haven't acquired any resources, so nothing to do
-   * here.
-   */
-
-  public void shutdown() {
-    //TODO: need to revisit this during integration of container IO.
-  }
-
-  /**
-   * Returns the temporary chunkFile path.
-   * @param chunkFile
-   * @param info
-   * @return temporary chunkFile path
-   * @throws StorageContainerException
-   */
-  private File getTmpChunkFile(File chunkFile, ChunkInfo info)
-      throws StorageContainerException {
-    return new File(chunkFile.getParent(),
-        chunkFile.getName() +
-            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
-  }
-
-  /**
-   * Commit the chunk by renaming the temporary chunk file to chunk file.
-   * @param tmpChunkFile
-   * @param chunkFile
-   * @throws IOException
-   */
-  private void commitChunk(File tmpChunkFile, File chunkFile) throws
-      IOException {
-    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
-        StandardCopyOption.REPLACE_EXISTING);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/package-info.java
deleted file mode 100644
index 564b50e..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.impl;
-/**
- * Chunk manager and block manager implementations for keyvalue container type.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
deleted file mode 100644
index 35ed22a..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.interfaces;
-
-import org.apache.hadoop.hdds.client.BlockID;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * BlockManager is for performing key related operations on the container.
- */
-public interface BlockManager {
-
-  /**
-   * Puts or overwrites a block.
-   *
-   * @param container - Container for which block need to be added.
-   * @param data     - Block Data.
-   * @return length of the Block.
-   * @throws IOException
-   */
-  long putBlock(Container container, BlockData data) throws IOException;
-
-  /**
-   * Gets an existing block.
-   *
-   * @param container - Container from which block need to be get.
-   * @param blockID - BlockID of the Block.
-   * @return Block Data.
-   * @throws IOException
-   */
-  BlockData getBlock(Container container, BlockID blockID) throws IOException;
-
-  /**
-   * Deletes an existing block.
-   *
-   * @param container - Container from which block need to be deleted.
-   * @param blockID - ID of the block.
-   * @throws StorageContainerException
-   */
-  void deleteBlock(Container container, BlockID blockID) throws IOException;
-
-  /**
-   * List blocks in a container.
-   *
-   * @param container - Container from which blocks need to be listed.
-   * @param startLocalID  - Block to start from, 0 to begin.
-   * @param count    - Number of blocks to return.
-   * @return List of Blocks that match the criteria.
-   */
-  List<BlockData> listBlock(Container container, long startLocalID, int count)
-      throws IOException;
-
-  /**
-   * Returns the last committed block length for the block.
-   * @param blockID blockId
-   */
-  long getCommittedBlockLength(Container container, BlockID blockID)
-      throws IOException;
-
-  /**
-   * Shutdown ContainerManager.
-   */
-  void shutdown();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
deleted file mode 100644
index 7134be1..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.hadoop.ozone.container.keyvalue.interfaces;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-
-/**
- * Chunk Manager allows read, write, delete and listing of chunks in
- * a container.
- */
-
-public interface ChunkManager {
-
-  /**
-   * writes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @param stage - Chunk Stage write.
-   * @throws StorageContainerException
-   */
-  void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-                  byte[] data, ContainerProtos.Stage stage)
-      throws StorageContainerException;
-
-  /**
-   * reads the data defined by a chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @return  byte array
-   * @throws StorageContainerException
-   *
-   * TODO: Right now we do not support partial reads and writes of chunks.
-   * TODO: Explore if we need to do that for ozone.
-   */
-  byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws
-      StorageContainerException;
-
-  /**
-   * Deletes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block.
-   * @param info  - Chunk Info
-   * @throws StorageContainerException
-   */
-  void deleteChunk(Container container, BlockID blockID, ChunkInfo info) throws
-      StorageContainerException;
-
-  // TODO : Support list operations.
-
-  /**
-   * Shutdown the chunkManager.
-   */
-  void shutdown();
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/package-info.java
deleted file mode 100644
index 5129094..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.interfaces;
-/**
- * Chunk manager and block manager interfaces for keyvalue container type.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/package-info.java
deleted file mode 100644
index 53c9f1e..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-/**
- This package contains classes for KeyValue container type.
- **/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
deleted file mode 100644
index d96fbfa..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ /dev/null
@@ -1,258 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import 
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
-import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ratis.shaded.com.google.protobuf
-    .InvalidProtocolBufferException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
-
-/**
- * A per-datanode container block deleting service takes in charge
- * of deleting staled ozone blocks.
- */
-// TODO: Fix BlockDeletingService to work with new StorageLayer
-public class BlockDeletingService extends BackgroundService{
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockDeletingService.class);
-
-  private ContainerSet containerSet;
-  private ContainerDeletionChoosingPolicy containerDeletionPolicy;
-  private final Configuration conf;
-
-  // Throttle number of blocks to delete per task,
-  // set to 1 for testing
-  private final int blockLimitPerTask;
-
-  // Throttle the number of containers to process concurrently at a time,
-  private final int containerLimitPerInterval;
-
-  // Task priority is useful when a to-delete block has weight.
-  private final static int TASK_PRIORITY_DEFAULT = 1;
-  // Core pool size for container tasks
-  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
-
-  public BlockDeletingService(ContainerSet containerSet, long serviceInterval,
-      long serviceTimeout, TimeUnit timeUnit, Configuration conf) {
-    super("BlockDeletingService", serviceInterval, timeUnit,
-        BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
-    this.containerSet = containerSet;
-    containerDeletionPolicy = ReflectionUtils.newInstance(conf.getClass(
-        ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
-        TopNOrderedContainerDeletionChoosingPolicy.class,
-        ContainerDeletionChoosingPolicy.class), conf);
-    this.conf = conf;
-    this.blockLimitPerTask = conf.getInt(
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
-    this.containerLimitPerInterval = conf.getInt(
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
-  }
-
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    List<ContainerData> containers = Lists.newArrayList();
-    try {
-      // We at most list a number of containers a time,
-      // in case there are too many containers and start too many workers.
-      // We must ensure there is no empty container in this result.
-      // The chosen result depends on what container deletion policy is
-      // configured.
-      containers = containerSet.chooseContainerForBlockDeletion(
-          containerLimitPerInterval, containerDeletionPolicy);
-      if (containers.size() > 0) {
-        LOG.info("Plan to choose {} containers for block deletion, "
-                + "actually returns {} valid containers.",
-            containerLimitPerInterval, containers.size());
-      }
-
-      for(ContainerData container : containers) {
-        BlockDeletingTask containerTask =
-            new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
-        queue.add(containerTask);
-      }
-    } catch (StorageContainerException e) {
-      LOG.warn("Failed to initiate block deleting tasks, "
-          + "caused by unable to get containers info. "
-          + "Retry in next interval. ", e);
-    } catch (Exception e) {
-      // In case listContainer call throws any uncaught RuntimeException.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Unexpected error occurs during deleting blocks.", e);
-      }
-    }
-    return queue;
-  }
-
-  private static class ContainerBackgroundTaskResult
-      implements BackgroundTaskResult {
-    private List<String> deletedBlockIds;
-
-    ContainerBackgroundTaskResult() {
-      deletedBlockIds = new LinkedList<>();
-    }
-
-    public void addBlockId(String blockId) {
-      deletedBlockIds.add(blockId);
-    }
-
-    public void addAll(List<String> blockIds) {
-      deletedBlockIds.addAll(blockIds);
-    }
-
-    public List<String> getDeletedBlocks() {
-      return deletedBlockIds;
-    }
-
-    @Override
-    public int getSize() {
-      return deletedBlockIds.size();
-    }
-  }
-
-  private class BlockDeletingTask
-      implements BackgroundTask<BackgroundTaskResult> {
-
-    private final int priority;
-    private final KeyValueContainerData containerData;
-
-    BlockDeletingTask(ContainerData containerName, int priority) {
-      this.priority = priority;
-      this.containerData = (KeyValueContainerData) containerName;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      long startTime = Time.monotonicNow();
-      // Scan container's db and get list of under deletion blocks
-      MetadataStore meta = BlockUtils.getDB(
-          (KeyValueContainerData) containerData, conf);
-      // # of blocks to delete is throttled
-      KeyPrefixFilter filter =
-          new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
-      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
-          meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
-      if (toDeleteBlocks.isEmpty()) {
-        LOG.debug("No under deletion block found in container : {}",
-            containerData.getContainerID());
-      }
-
-      List<String> succeedBlocks = new LinkedList<>();
-      LOG.debug("Container : {}, To-Delete blocks : {}",
-          containerData.getContainerID(), toDeleteBlocks.size());
-      File dataDir = new File(containerData.getChunksPath());
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "does not exist or not a directory", dataDir.getAbsolutePath());
-        return crr;
-      }
-
-      toDeleteBlocks.forEach(entry -> {
-        String blockName = DFSUtil.bytes2String(entry.getKey());
-        LOG.debug("Deleting block {}", blockName);
-        try {
-          ContainerProtos.BlockData data =
-              ContainerProtos.BlockData.parseFrom(entry.getValue());
-          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
-            File chunkFile = dataDir.toPath()
-                .resolve(chunkInfo.getChunkName()).toFile();
-            if (FileUtils.deleteQuietly(chunkFile)) {
-              LOG.debug("block {} chunk {} deleted", blockName,
-                  chunkFile.getAbsolutePath());
-            }
-          }
-          succeedBlocks.add(blockName);
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to parse block info for block {}", blockName, e);
-        }
-      });
-
-      // Once files are deleted... replace deleting entries with deleted 
entries
-      BatchOperation batch = new BatchOperation();
-      succeedBlocks.forEach(entry -> {
-        String blockId =
-            entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
-        String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
-        batch.put(DFSUtil.string2Bytes(deletedEntry),
-            DFSUtil.string2Bytes(blockId));
-        batch.delete(DFSUtil.string2Bytes(entry));
-      });
-      meta.writeBatch(batch);
-      // update count of pending deletion blocks in in-memory container status
-      containerData.decrPendingDeletionBlocks(succeedBlocks.size());
-
-      if (!succeedBlocks.isEmpty()) {
-        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerID(), succeedBlocks.size(),
-            Time.monotonicNow() - startTime);
-      }
-      crr.addAll(succeedBlocks);
-      return crr;
-    }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/package-info.java
deleted file mode 100644
index 69d8042..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
deleted file mode 100644
index c3a4126..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.ozoneimpl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-/**
- * Class used to read .container files from Volume and build container map.
- *
- * Layout of the container directory on disk is as follows:
- *
- * ../hdds/VERSION
- * 
../hdds/<<scmUuid>>/current/<<containerDir>>/<<containerID>/metadata/<<containerID>>.container
- * ../hdds/<<scmUuid>>/current/<<containerDir>>/<<containerID>/<<dataPath>>
- *
- * Some ContainerTypes will have extra metadata other than the .container
- * file. For example, KeyValueContainer will have a .db file. This .db file
- * will also be stored in the metadata folder along with the .container file.
- *
- * 
../hdds/<<scmUuid>>/current/<<containerDir>>/<<KVcontainerID>/metadata/<<KVcontainerID>>.db
- *
- * Note that the <<dataPath>> is dependent on the ContainerType.
- * For KeyValueContainers, the data is stored in a "chunks" folder. As such,
- * the <<dataPath>> layout for KeyValueContainers is
- *
- * 
../hdds/<<scmUuid>>/current/<<containerDir>>/<<KVcontainerID>/chunks/<<chunksFile>>
- *
- */
-public class ContainerReader implements Runnable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      ContainerReader.class);
-  private HddsVolume hddsVolume;
-  private final ContainerSet containerSet;
-  private final OzoneConfiguration config;
-  private final File hddsVolumeDir;
-  private final VolumeSet volumeSet;
-
-  ContainerReader(VolumeSet volSet, HddsVolume volume, ContainerSet cset,
-                  OzoneConfiguration conf) {
-    Preconditions.checkNotNull(volume);
-    this.hddsVolume = volume;
-    this.hddsVolumeDir = hddsVolume.getHddsRootDir();
-    this.containerSet = cset;
-    this.config = conf;
-    this.volumeSet = volSet;
-  }
-
-  @Override
-  public void run() {
-    try {
-      readVolume(hddsVolumeDir);
-    } catch (RuntimeException ex) {
-      LOG.info("Caught an Run time exception during reading container files" +
-          " from Volume {}", hddsVolumeDir);
-    }
-  }
-
-  public void readVolume(File hddsVolumeRootDir) {
-    Preconditions.checkNotNull(hddsVolumeRootDir, "hddsVolumeRootDir" +
-        "cannot be null");
-
-    //filtering scm directory
-    File[] scmDir = hddsVolumeRootDir.listFiles(new FileFilter() {
-      @Override
-      public boolean accept(File pathname) {
-        return pathname.isDirectory();
-      }
-    });
-
-    if (scmDir == null) {
-      LOG.error("IO error for the volume {}, skipped loading",
-          hddsVolumeRootDir);
-      volumeSet.failVolume(hddsVolumeRootDir.getPath());
-      return;
-    }
-
-    if (scmDir.length > 1) {
-      LOG.error("Volume {} is in Inconsistent state", hddsVolumeRootDir);
-      volumeSet.failVolume(hddsVolumeRootDir.getPath());
-      return;
-    }
-
-    for (File scmLoc : scmDir) {
-      File currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT);
-      File[] containerTopDirs = currentDir.listFiles();
-      if (containerTopDirs != null) {
-        for (File containerTopDir : containerTopDirs) {
-          if (containerTopDir.isDirectory()) {
-            File[] containerDirs = containerTopDir.listFiles();
-            if (containerDirs != null) {
-              for (File containerDir : containerDirs) {
-                File containerFile = ContainerUtils.getContainerFile(
-                    containerDir);
-                long containerID = ContainerUtils.getContainerID(containerDir);
-                if (containerFile.exists()) {
-                  verifyContainerFile(containerID, containerFile);
-                } else {
-                  LOG.error("Missing .container file for ContainerID: {}",
-                      containerDir.getName());
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private void verifyContainerFile(long containerID, File containerFile) {
-    try {
-      ContainerData containerData = ContainerDataYaml.readContainerFile(
-          containerFile);
-      if (containerID != containerData.getContainerID()) {
-        LOG.error("Invalid ContainerID in file {}. " +
-            "Skipping loading of this container.", containerFile);
-        return;
-      }
-      verifyContainerData(containerData);
-    } catch (IOException ex) {
-      LOG.error("Failed to parse ContainerFile for ContainerID: {}",
-          containerID, ex);
-    }
-  }
-
-  public void verifyContainerData(ContainerData containerData)
-      throws IOException {
-    switch (containerData.getContainerType()) {
-    case KeyValueContainer:
-      if (containerData instanceof KeyValueContainerData) {
-        KeyValueContainerData kvContainerData = (KeyValueContainerData)
-            containerData;
-        containerData.setVolume(hddsVolume);
-
-        KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
-        KeyValueContainer kvContainer = new KeyValueContainer(
-            kvContainerData, config);
-        MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config);
-        MetadataKeyFilters.KeyPrefixFilter filter =
-            new MetadataKeyFilters.KeyPrefixFilter()
-                .addFilter(OzoneConsts.DELETING_KEY_PREFIX);
-        int numPendingDeletionBlocks =
-            containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
-                .size();
-        kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
-        byte[] delTxnId = containerDB.get(
-            DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
-        if (delTxnId != null) {
-          kvContainerData
-              .updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
-        }
-        containerSet.addContainer(kvContainer);
-      } else {
-        throw new StorageContainerException("Container File is corrupted. " +
-            "ContainerType is KeyValueContainer but cast to " +
-            "KeyValueContainerData failed. ",
-            ContainerProtos.Result.CONTAINER_METADATA_ERROR);
-      }
-      break;
-    default:
-      throw new StorageContainerException("Unrecognized ContainerType " +
-          containerData.getContainerType(),
-          ContainerProtos.Result.UNKNOWN_CONTAINER_TYPE);
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to