http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
deleted file mode 100644
index 09d4054..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerLifeCycleState;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.keyvalue.helpers
-    .KeyValueContainerLocationUtil;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.utils.MetadataStore;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_ALREADY_EXISTS;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_FILES_CREATE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.DISK_OUT_OF_SPACE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.ERROR_IN_COMPACT_DB;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNSUPPORTED_REQUEST;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class to perform KeyValue Container operations.
- */
-public class KeyValueContainer implements Container<KeyValueContainerData> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Container.class);
-
-  // Use a non-fair RW lock for better throughput, we may revisit this decision
-  // if this causes fairness issues.
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-  private final KeyValueContainerData containerData;
-  private Configuration config;
-
-  public KeyValueContainer(KeyValueContainerData containerData, Configuration
-      ozoneConfig) {
-    Preconditions.checkNotNull(containerData, "KeyValueContainerData cannot " +
-        "be null");
-    Preconditions.checkNotNull(ozoneConfig, "Ozone configuration cannot " +
-        "be null");
-    this.config = ozoneConfig;
-    this.containerData = containerData;
-  }
-
-  @Override
-  public void create(VolumeSet volumeSet, VolumeChoosingPolicy
-      volumeChoosingPolicy, String scmId) throws StorageContainerException {
-    Preconditions.checkNotNull(volumeChoosingPolicy, "VolumeChoosingPolicy " +
-        "cannot be null");
-    Preconditions.checkNotNull(volumeSet, "VolumeSet cannot be null");
-    Preconditions.checkNotNull(scmId, "scmId cannot be null");
-
-    File containerMetaDataPath = null;
-    //acquiring volumeset lock and container lock
-    volumeSet.acquireLock();
-    long maxSize = containerData.getMaxSize();
-    try {
-      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-          .getVolumesList(), maxSize);
-      String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-
-      long containerID = containerData.getContainerID();
-
-      containerMetaDataPath = KeyValueContainerLocationUtil
-          .getContainerMetaDataPath(hddsVolumeDir, scmId, containerID);
-      containerData.setMetadataPath(containerMetaDataPath.getPath());
-
-      File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
-          hddsVolumeDir, scmId, containerID);
-
-      // Check if it is new Container.
-      ContainerUtils.verifyIsNewContainer(containerMetaDataPath);
-
-      //Create Metadata path chunks path and metadata db
-      File dbFile = getContainerDBFile();
-      KeyValueContainerUtil.createContainerMetaData(containerMetaDataPath,
-          chunksPath, dbFile, config);
-
-      String impl = 
config.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
-          OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
-
-      //Set containerData for the KeyValueContainer.
-      containerData.setChunksPath(chunksPath.getPath());
-      containerData.setContainerDBType(impl);
-      containerData.setDbFile(dbFile);
-      containerData.setVolume(containerVolume);
-
-      // Create .container file
-      File containerFile = getContainerFile();
-      createContainerFile(containerFile);
-
-    } catch (StorageContainerException ex) {
-      if (containerMetaDataPath != null && 
containerMetaDataPath.getParentFile()
-          .exists()) {
-        FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
-      }
-      throw ex;
-    } catch (DiskOutOfSpaceException ex) {
-      throw new StorageContainerException("Container creation failed, due to " 
+
-          "disk out of space", ex, DISK_OUT_OF_SPACE);
-    } catch (FileAlreadyExistsException ex) {
-      throw new StorageContainerException("Container creation failed because " 
+
-          "ContainerFile already exists", ex, CONTAINER_ALREADY_EXISTS);
-    } catch (IOException ex) {
-      if (containerMetaDataPath != null && 
containerMetaDataPath.getParentFile()
-          .exists()) {
-        FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
-      }
-      throw new StorageContainerException("Container creation failed.", ex,
-          CONTAINER_INTERNAL_ERROR);
-    } finally {
-      volumeSet.releaseLock();
-    }
-  }
-
-  /**
-   * Set all of the path realted container data fields based on the name
-   * conventions.
-   *
-   * @param scmId
-   * @param containerVolume
-   * @param hddsVolumeDir
-   */
-  public void populatePathFields(String scmId,
-      HddsVolume containerVolume, String hddsVolumeDir) {
-
-    long containerId = containerData.getContainerID();
-
-    File containerMetaDataPath = KeyValueContainerLocationUtil
-        .getContainerMetaDataPath(hddsVolumeDir, scmId, containerId);
-
-    File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
-        hddsVolumeDir, scmId, containerId);
-    File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
-        containerMetaDataPath, containerId);
-
-    //Set containerData for the KeyValueContainer.
-    containerData.setMetadataPath(containerMetaDataPath.getPath());
-    containerData.setChunksPath(chunksPath.getPath());
-    containerData.setDbFile(dbFile);
-    containerData.setVolume(containerVolume);
-  }
-
-  /**
-   * Writes to .container file.
-   *
-   * @param containerFile container file name
-   * @param isCreate True if creating a new file. False is updating an
-   *                 existing container file.
-   * @throws StorageContainerException
-   */
-  private void writeToContainerFile(File containerFile, boolean isCreate)
-      throws StorageContainerException {
-    File tempContainerFile = null;
-    long containerId = containerData.getContainerID();
-    try {
-      tempContainerFile = createTempFile(containerFile);
-      ContainerDataYaml.createContainerFile(
-          ContainerType.KeyValueContainer, containerData, tempContainerFile);
-
-      // NativeIO.renameTo is an atomic function. But it might fail if the
-      // container file already exists. Hence, we handle the two cases
-      // separately.
-      if (isCreate) {
-        NativeIO.renameTo(tempContainerFile, containerFile);
-      } else {
-        Files.move(tempContainerFile.toPath(), containerFile.toPath(),
-            StandardCopyOption.REPLACE_EXISTING);
-      }
-
-    } catch (IOException ex) {
-      throw new StorageContainerException("Error while creating/ updating " +
-          ".container file. ContainerID: " + containerId, ex,
-          CONTAINER_FILES_CREATE_ERROR);
-    } finally {
-      if (tempContainerFile != null && tempContainerFile.exists()) {
-        if (!tempContainerFile.delete()) {
-          LOG.warn("Unable to delete container temporary file: {}.",
-              tempContainerFile.getAbsolutePath());
-        }
-      }
-    }
-  }
-
-  private void createContainerFile(File containerFile)
-      throws StorageContainerException {
-    writeToContainerFile(containerFile, true);
-  }
-
-  private void updateContainerFile(File containerFile)
-      throws StorageContainerException {
-    writeToContainerFile(containerFile, false);
-  }
-
-
-  @Override
-  public void delete(boolean forceDelete)
-      throws StorageContainerException {
-    long containerId = containerData.getContainerID();
-    try {
-      KeyValueContainerUtil.removeContainer(containerData, config, 
forceDelete);
-    } catch (StorageContainerException ex) {
-      throw ex;
-    } catch (IOException ex) {
-      // TODO : An I/O error during delete can leave partial artifacts on the
-      // disk. We will need the cleaner thread to cleanup this information.
-      String errMsg = String.format("Failed to cleanup container. ID: %d",
-          containerId);
-      LOG.error(errMsg, ex);
-      throw new StorageContainerException(errMsg, ex, 
CONTAINER_INTERNAL_ERROR);
-    }
-  }
-
-  @Override
-  public void close() throws StorageContainerException {
-
-    //TODO: writing .container file and compaction can be done
-    // asynchronously, otherwise rpc call for this will take a lot of time to
-    // complete this action
-    try {
-      writeLock();
-
-      containerData.closeContainer();
-      File containerFile = getContainerFile();
-      // update the new container data to .container File
-      updateContainerFile(containerFile);
-
-    } catch (StorageContainerException ex) {
-      // Failed to update .container file. Reset the state to CLOSING
-      containerData.setState(ContainerLifeCycleState.CLOSING);
-      throw ex;
-    } finally {
-      writeUnlock();
-    }
-
-    // It is ok if this operation takes a bit of time.
-    // Close container is not expected to be instantaneous.
-    try {
-      MetadataStore db = BlockUtils.getDB(containerData, config);
-      db.compactDB();
-    } catch (StorageContainerException ex) {
-      throw ex;
-    } catch (IOException ex) {
-      LOG.error("Error in DB compaction while closing container", ex);
-      throw new StorageContainerException(ex, ERROR_IN_COMPACT_DB);
-    }
-  }
-
-  @Override
-  public KeyValueContainerData getContainerData()  {
-    return containerData;
-  }
-
-  @Override
-  public ContainerLifeCycleState getContainerState() {
-    return containerData.getState();
-  }
-
-  @Override
-  public ContainerType getContainerType() {
-    return ContainerType.KeyValueContainer;
-  }
-
-  @Override
-  public void update(Map<String, String> metadata, boolean forceUpdate)
-      throws StorageContainerException {
-
-    // TODO: Now, when writing the updated data to .container file, we are
-    // holding lock and writing data to disk. We can have async implementation
-    // to flush the update container data to disk.
-    long containerId = containerData.getContainerID();
-    if(!containerData.isValid()) {
-      LOG.debug("Invalid container data. ContainerID: {}", containerId);
-      throw new StorageContainerException("Invalid container data. " +
-          "ContainerID: " + containerId, INVALID_CONTAINER_STATE);
-    }
-    if (!forceUpdate && !containerData.isOpen()) {
-      throw new StorageContainerException(
-          "Updating a closed container without force option is not allowed. " +
-              "ContainerID: " + containerId, UNSUPPORTED_REQUEST);
-    }
-
-    Map<String, String> oldMetadata = containerData.getMetadata();
-    try {
-      writeLock();
-      for (Map.Entry<String, String> entry : metadata.entrySet()) {
-        containerData.addMetadata(entry.getKey(), entry.getValue());
-      }
-
-      File containerFile = getContainerFile();
-      // update the new container data to .container File
-      updateContainerFile(containerFile);
-    } catch (StorageContainerException  ex) {
-      containerData.setMetadata(oldMetadata);
-      throw ex;
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  @Override
-  public void updateDeleteTransactionId(long deleteTransactionId) {
-    containerData.updateDeleteTransactionId(deleteTransactionId);
-  }
-
-  @Override
-  public KeyValueBlockIterator blockIterator() throws IOException{
-    return new KeyValueBlockIterator(containerData.getContainerID(), new File(
-        containerData.getContainerPath()));
-  }
-
-  @Override
-  public void importContainerData(InputStream input,
-      ContainerPacker<KeyValueContainerData> packer) throws IOException {
-    writeLock();
-    try {
-      if (getContainerFile().exists()) {
-        String errorMessage = String.format(
-            "Can't import container (cid=%d) data to a specific location"
-                + " as the container descriptor (%s) has already been exist.",
-            getContainerData().getContainerID(),
-            getContainerFile().getAbsolutePath());
-        throw new IOException(errorMessage);
-      }
-      //copy the values from the input stream to the final destination
-      // directory.
-      byte[] descriptorContent = packer.unpackContainerData(this, input);
-
-      Preconditions.checkNotNull(descriptorContent,
-          "Container descriptor is missing from the container archive: "
-              + getContainerData().getContainerID());
-
-      //now, we have extracted the container descriptor from the previous
-      //datanode. We can load it and upload it with the current data
-      // (original metadata + current filepath fields)
-      KeyValueContainerData originalContainerData =
-          (KeyValueContainerData) ContainerDataYaml
-              .readContainer(descriptorContent);
-
-
-      containerData.setState(originalContainerData.getState());
-      containerData
-          .setContainerDBType(originalContainerData.getContainerDBType());
-      containerData.setBytesUsed(originalContainerData.getBytesUsed());
-
-      //rewriting the yaml file with new checksum calculation.
-      update(originalContainerData.getMetadata(), true);
-
-      //fill in memory stat counter (keycount, byte usage)
-      KeyValueContainerUtil.parseKVContainerData(containerData, config);
-
-    } catch (Exception ex) {
-      //delete all the temporary data in case of any exception.
-      try {
-        FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
-        FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
-        FileUtils.deleteDirectory(getContainerFile());
-      } catch (Exception deleteex) {
-        LOG.error(
-            "Can not cleanup destination directories after a container import"
-                + " error (cid" +
-                containerData.getContainerID() + ")", deleteex);
-      }
-      throw ex;
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  @Override
-  public void exportContainerData(OutputStream destination,
-      ContainerPacker<KeyValueContainerData> packer) throws IOException {
-    if (getContainerData().getState() != ContainerLifeCycleState.CLOSED) {
-      throw new IllegalStateException(
-          "Only closed containers could be exported: ContainerId="
-              + getContainerData().getContainerID());
-    }
-    packer.pack(this, destination);
-  }
-
-  /**
-   * Acquire read lock.
-   */
-  public void readLock() {
-    this.lock.readLock().lock();
-
-  }
-
-  /**
-   * Release read lock.
-   */
-  public void readUnlock() {
-    this.lock.readLock().unlock();
-  }
-
-  /**
-   * Check if the current thread holds read lock.
-   */
-  public boolean hasReadLock() {
-    return this.lock.readLock().tryLock();
-  }
-
-  /**
-   * Acquire write lock.
-   */
-  public void writeLock() {
-    this.lock.writeLock().lock();
-  }
-
-  /**
-   * Release write lock.
-   */
-  public void writeUnlock() {
-    this.lock.writeLock().unlock();
-
-  }
-
-  /**
-   * Check if the current thread holds write lock.
-   */
-  public boolean hasWriteLock() {
-    return this.lock.writeLock().isHeldByCurrentThread();
-  }
-
-  /**
-   * Acquire read lock, unless interrupted while waiting.
-   * @throws InterruptedException
-   */
-  @Override
-  public void readLockInterruptibly() throws InterruptedException {
-    this.lock.readLock().lockInterruptibly();
-  }
-
-  /**
-   * Acquire write lock, unless interrupted while waiting.
-   * @throws InterruptedException
-   */
-  @Override
-  public void writeLockInterruptibly() throws InterruptedException {
-    this.lock.writeLock().lockInterruptibly();
-
-  }
-
-  /**
-   * Returns containerFile.
-   * @return .container File name
-   */
-  @Override
-  public File getContainerFile() {
-    return new File(containerData.getMetadataPath(), containerData
-        .getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
-  }
-
-  /**
-   * Returns KeyValueContainerReport for the KeyValueContainer.
-   */
-  @Override
-  public StorageContainerDatanodeProtocolProtos.ContainerInfo
-      getContainerReport() throws StorageContainerException{
-    StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
-        StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
-    ciBuilder.setContainerID(containerData.getContainerID())
-        .setReadCount(containerData.getReadCount())
-        .setWriteCount(containerData.getWriteCount())
-        .setReadBytes(containerData.getReadBytes())
-        .setWriteBytes(containerData.getWriteBytes())
-        .setKeyCount(containerData.getKeyCount())
-        .setUsed(containerData.getBytesUsed())
-        .setState(getHddsState())
-        .setDeleteTransactionId(containerData.getDeleteTransactionId());
-    return ciBuilder.build();
-  }
-
-  /**
-   * Returns LifeCycle State of the container.
-   * @return LifeCycle State of the container in HddsProtos format
-   * @throws StorageContainerException
-   */
-  private HddsProtos.LifeCycleState getHddsState()
-      throws StorageContainerException {
-    HddsProtos.LifeCycleState state;
-    switch (containerData.getState()) {
-    case OPEN:
-      state = HddsProtos.LifeCycleState.OPEN;
-      break;
-    case CLOSING:
-      state = HddsProtos.LifeCycleState.CLOSING;
-      break;
-    case CLOSED:
-      state = HddsProtos.LifeCycleState.CLOSED;
-      break;
-    default:
-      throw new StorageContainerException("Invalid Container state found: " +
-          containerData.getContainerID(), INVALID_CONTAINER_STATE);
-    }
-    return state;
-  }
-
-  /**
-   * Returns container DB file.
-   * @return
-   */
-  public File getContainerDBFile() {
-    return new File(containerData.getMetadataPath(), containerData
-        .getContainerID() + OzoneConsts.DN_CONTAINER_DB);
-  }
-
-  /**
-   * Creates a temporary file.
-   * @param file
-   * @return
-   * @throws IOException
-   */
-  private File createTempFile(File file) throws IOException{
-    return File.createTempFile("tmp_" + System.currentTimeMillis() + "_",
-        file.getName(), file.getParentFile());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
deleted file mode 100644
index 7ffdbf5..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-
-import org.apache.hadoop.conf.StorageSize;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.yaml.snakeyaml.nodes.Tag;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static java.lang.Math.max;
-import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
-import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH;
-
-/**
- * This class represents the KeyValueContainer metadata, which is the
- * in-memory representation of container metadata and is represented on disk
- * by the .container file.
- */
-public class KeyValueContainerData extends ContainerData {
-
-  // Yaml Tag used for KeyValueContainerData.
-  public static final Tag KEYVALUE_YAML_TAG = new Tag("KeyValueContainerData");
-
-  // Fields need to be stored in .container file.
-  private static final List<String> KV_YAML_FIELDS;
-
-  // Path to Container metadata Level DB/RocksDB Store and .container file.
-  private String metadataPath;
-
-  // Path to Physical file system where chunks are stored.
-  private String chunksPath;
-
-  //Type of DB used to store key to chunks mapping
-  private String containerDBType;
-
-  private File dbFile = null;
-
-  /**
-   * Number of pending deletion blocks in KeyValueContainer.
-   */
-  private final AtomicInteger numPendingDeletionBlocks;
-
-  private long deleteTransactionId;
-
-  static {
-    // Initialize YAML fields
-    KV_YAML_FIELDS = Lists.newArrayList();
-    KV_YAML_FIELDS.addAll(YAML_FIELDS);
-    KV_YAML_FIELDS.add(METADATA_PATH);
-    KV_YAML_FIELDS.add(CHUNKS_PATH);
-    KV_YAML_FIELDS.add(CONTAINER_DB_TYPE);
-  }
-
-  /**
-   * Constructs KeyValueContainerData object.
-   * @param id - ContainerId
-   * @param size - maximum size of the container in bytes
-   */
-  public KeyValueContainerData(long id, long size) {
-    super(ContainerProtos.ContainerType.KeyValueContainer, id, size);
-    this.numPendingDeletionBlocks = new AtomicInteger(0);
-    this.deleteTransactionId = 0;
-  }
-
-  /**
-   * Constructs KeyValueContainerData object.
-   * @param id - ContainerId
-   * @param layOutVersion
-   * @param size - maximum size of the container in bytes
-   */
-  public KeyValueContainerData(long id, int layOutVersion, long size) {
-    super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
-        size);
-    this.numPendingDeletionBlocks = new AtomicInteger(0);
-    this.deleteTransactionId = 0;
-  }
-
-
-  /**
-   * Sets Container dbFile. This should be called only during creation of
-   * KeyValue container.
-   * @param containerDbFile
-   */
-  public void setDbFile(File containerDbFile) {
-    dbFile = containerDbFile;
-  }
-
-  /**
-   * Returns container DB file.
-   * @return dbFile
-   */
-  public File getDbFile() {
-    return dbFile;
-  }
-
-  /**
-   * Returns container metadata path.
-   * @return - Physical path where container file and checksum is stored.
-   */
-  public String getMetadataPath() {
-    return metadataPath;
-  }
-
-  /**
-   * Sets container metadata path.
-   *
-   * @param path - String.
-   */
-  public void setMetadataPath(String path) {
-    this.metadataPath = path;
-  }
-
-  /**
-   * Returns the path to base dir of the container.
-   * @return Path to base dir
-   */
-  public String getContainerPath() {
-    if (metadataPath == null) {
-      return null;
-    }
-    return new File(metadataPath).getParent();
-  }
-
-  /**
-   * Get chunks path.
-   * @return - Path where chunks are stored
-   */
-  public String getChunksPath() {
-    return chunksPath;
-  }
-
-  /**
-   * Set chunks Path.
-   * @param chunkPath - File path.
-   */
-  public void setChunksPath(String chunkPath) {
-    this.chunksPath = chunkPath;
-  }
-
-  /**
-   * Returns the DBType used for the container.
-   * @return containerDBType
-   */
-  public String getContainerDBType() {
-    return containerDBType;
-  }
-
-  /**
-   * Sets the DBType used for the container.
-   * @param containerDBType
-   */
-  public void setContainerDBType(String containerDBType) {
-    this.containerDBType = containerDBType;
-  }
-
-  /**
-   * Increase the count of pending deletion blocks.
-   *
-   * @param numBlocks increment number
-   */
-  public void incrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks.addAndGet(numBlocks);
-  }
-
-  /**
-   * Decrease the count of pending deletion blocks.
-   *
-   * @param numBlocks decrement number
-   */
-  public void decrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks.addAndGet(-1 * numBlocks);
-  }
-
-  /**
-   * Get the number of pending deletion blocks.
-   */
-  public int getNumPendingDeletionBlocks() {
-    return this.numPendingDeletionBlocks.get();
-  }
-
-  /**
-   * Sets deleteTransactionId to latest delete transactionId for the container.
-   *
-   * @param transactionId latest transactionId of the container.
-   */
-  public void updateDeleteTransactionId(long transactionId) {
-    deleteTransactionId = max(transactionId, deleteTransactionId);
-  }
-
-  /**
-   * Return the latest deleteTransactionId of the container.
-   */
-  public long getDeleteTransactionId() {
-    return deleteTransactionId;
-  }
-
-  /**
-   * Returns a ProtoBuf Message from ContainerData.
-   *
-   * @return Protocol Buffer Message
-   */
-  public ContainerProtos.ContainerData getProtoBufMessage() {
-    ContainerProtos.ContainerData.Builder builder = ContainerProtos
-        .ContainerData.newBuilder();
-    builder.setContainerID(this.getContainerID());
-    builder.setContainerPath(this.getMetadataPath());
-    builder.setState(this.getState());
-
-    for (Map.Entry<String, String> entry : getMetadata().entrySet()) {
-      ContainerProtos.KeyValue.Builder keyValBuilder =
-          ContainerProtos.KeyValue.newBuilder();
-      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
-          .setValue(entry.getValue()).build());
-    }
-
-    if (this.getBytesUsed() >= 0) {
-      builder.setBytesUsed(this.getBytesUsed());
-    }
-
-    if(this.getContainerType() != null) {
-      
builder.setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
-    }
-
-    return builder.build();
-  }
-
-  public static List<String> getYamlFields() {
-    return Collections.unmodifiableList(KV_YAML_FIELDS);
-  }
-
-  /**
-   * Constructs a KeyValueContainerData object from ProtoBuf classes.
-   *
-   * @param protoData - ProtoBuf Message
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public static KeyValueContainerData getFromProtoBuf(
-      ContainerProtos.ContainerData protoData) throws IOException {
-    // TODO: Add containerMaxSize to ContainerProtos.ContainerData
-    StorageSize storageSize = StorageSize.parse(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
-    KeyValueContainerData data = new KeyValueContainerData(
-        protoData.getContainerID(),
-        (long)storageSize.getUnit().toBytes(storageSize.getValue()));
-    for (int x = 0; x < protoData.getMetadataCount(); x++) {
-      data.addMetadata(protoData.getMetadata(x).getKey(),
-          protoData.getMetadata(x).getValue());
-    }
-
-    if (protoData.hasContainerPath()) {
-      String metadataPath = protoData.getContainerPath()+ File.separator +
-          OzoneConsts.CONTAINER_META_PATH;
-      data.setMetadataPath(metadataPath);
-    }
-
-    if (protoData.hasState()) {
-      data.setState(protoData.getState());
-    }
-
-    if (protoData.hasBytesUsed()) {
-      data.setBytesUsed(protoData.getBytesUsed());
-    }
-
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
deleted file mode 100644
index 5be6e28..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ /dev/null
@@ -1,850 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerLifeCycleState;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .PutSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume
-    .RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
-import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
-import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
-import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
-    .BlockDeletingService;
-import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.BLOCK_NOT_COMMITTED;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.DELETE_ON_OPEN_CONTAINER;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.GET_SMALL_FILE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.PUT_SMALL_FILE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Stage;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handler for KeyValue Container type.
- */
-public class KeyValueHandler extends Handler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      KeyValueHandler.class);
-
-  private final ContainerType containerType;
-  private final BlockManager blockManager;
-  private final ChunkManager chunkManager;
-  private final BlockDeletingService blockDeletingService;
-  private final VolumeChoosingPolicy volumeChoosingPolicy;
-  private final long maxContainerSize;
-  private final AutoCloseableLock handlerLock;
-  private final OpenContainerBlockMap openContainerBlockMap;
-
-  public KeyValueHandler(Configuration config, ContainerSet contSet,
-      VolumeSet volSet, ContainerMetrics metrics) {
-    super(config, contSet, volSet, metrics);
-    containerType = ContainerType.KeyValueContainer;
-    blockManager = new BlockManagerImpl(config);
-    chunkManager = new ChunkManagerImpl();
-    long svcInterval = config
-        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
-            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    long serviceTimeout = config
-        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
-            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    this.blockDeletingService =
-        new BlockDeletingService(containerSet, svcInterval, serviceTimeout,
-            TimeUnit.MILLISECONDS, config);
-    blockDeletingService.start();
-    volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
-        HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
-            .class, VolumeChoosingPolicy.class), conf);
-    maxContainerSize = (long)config.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-            ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-    // this handler lock is used for synchronizing createContainer Requests,
-    // so using a fair lock here.
-    handlerLock = new AutoCloseableLock(new ReentrantLock(true));
-    openContainerBlockMap = new OpenContainerBlockMap();
-  }
-
-  @VisibleForTesting
-  public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
-    return volumeChoosingPolicy;
-  }
-  /**
-   * Returns OpenContainerBlockMap instance.
-   *
-   * @return OpenContainerBlockMap
-   */
-  public OpenContainerBlockMap getOpenContainerBlockMap() {
-    return openContainerBlockMap;
-  }
-
-  @Override
-  public ContainerCommandResponseProto handle(
-      ContainerCommandRequestProto request, Container container) {
-
-    Type cmdType = request.getCmdType();
-    KeyValueContainer kvContainer = (KeyValueContainer) container;
-    switch(cmdType) {
-    case CreateContainer:
-      return handleCreateContainer(request, kvContainer);
-    case ReadContainer:
-      return handleReadContainer(request, kvContainer);
-    case UpdateContainer:
-      return handleUpdateContainer(request, kvContainer);
-    case DeleteContainer:
-      return handleDeleteContainer(request, kvContainer);
-    case ListContainer:
-      return handleUnsupportedOp(request);
-    case CloseContainer:
-      return handleCloseContainer(request, kvContainer);
-    case PutBlock:
-      return handlePutBlock(request, kvContainer);
-    case GetBlock:
-      return handleGetBlock(request, kvContainer);
-    case DeleteBlock:
-      return handleDeleteBlock(request, kvContainer);
-    case ListBlock:
-      return handleUnsupportedOp(request);
-    case ReadChunk:
-      return handleReadChunk(request, kvContainer);
-    case DeleteChunk:
-      return handleDeleteChunk(request, kvContainer);
-    case WriteChunk:
-      return handleWriteChunk(request, kvContainer);
-    case ListChunk:
-      return handleUnsupportedOp(request);
-    case CompactChunk:
-      return handleUnsupportedOp(request);
-    case PutSmallFile:
-      return handlePutSmallFile(request, kvContainer);
-    case GetSmallFile:
-      return handleGetSmallFile(request, kvContainer);
-    case GetCommittedBlockLength:
-      return handleGetCommittedBlockLength(request, kvContainer);
-    default:
-      return null;
-    }
-  }
-
-  @VisibleForTesting
-  public ChunkManager getChunkManager() {
-    return this.chunkManager;
-  }
-
-  @VisibleForTesting
-  public BlockManager getBlockManager() {
-    return this.blockManager;
-  }
-
-  /**
-   * Handles Create Container Request. If successful, adds the container to
-   * ContainerSet.
-   */
-  ContainerCommandResponseProto handleCreateContainer(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-    if (!request.hasCreateContainer()) {
-      LOG.debug("Malformed Create Container request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-    // Create Container request should be passed a null container as the
-    // container would be created here.
-    Preconditions.checkArgument(kvContainer == null);
-
-    long containerID = request.getContainerID();
-
-    KeyValueContainerData newContainerData = new KeyValueContainerData(
-        containerID, maxContainerSize);
-    // TODO: Add support to add metadataList to ContainerData. Add metadata
-    // to container during creation.
-    KeyValueContainer newContainer = new KeyValueContainer(
-        newContainerData, conf);
-
-    try {
-      handlerLock.acquire();
-      if (containerSet.getContainer(containerID) == null) {
-        newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
-        containerSet.addContainer(newContainer);
-      } else {
-        throw new StorageContainerException("Container already exists with " +
-            "container Id " + containerID, ContainerProtos.Result
-            .CONTAINER_EXISTS);
-      }
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } finally {
-      handlerLock.release();
-    }
-
-    return ContainerUtils.getSuccessResponse(request);
-  }
-
-  public void populateContainerPathFields(KeyValueContainer container,
-      long maxSize) throws IOException {
-    volumeSet.acquireLock();
-    try {
-      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-          .getVolumesList(), maxSize);
-      String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-      container.populatePathFields(scmID, containerVolume, hddsVolumeDir);
-    } finally {
-      volumeSet.releaseLock();
-    }
-  }
-
-  /**
-   * Handles Read Container Request. Returns the ContainerData as response.
-   */
-  ContainerCommandResponseProto handleReadContainer(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-    if (!request.hasReadContainer()) {
-      LOG.debug("Malformed Read Container request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    KeyValueContainerData containerData = kvContainer.getContainerData();
-    return KeyValueContainerUtil.getReadContainerResponse(
-        request, containerData);
-  }
-
-
-  /**
-   * Handles Update Container Request. If successful, the container metadata
-   * is updated.
-   */
-  ContainerCommandResponseProto handleUpdateContainer(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasUpdateContainer()) {
-      LOG.debug("Malformed Update Container request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    boolean forceUpdate = request.getUpdateContainer().getForceUpdate();
-    List<KeyValue> keyValueList =
-        request.getUpdateContainer().getMetadataList();
-    Map<String, String> metadata = new HashMap<>();
-    for (KeyValue keyValue : keyValueList) {
-      metadata.put(keyValue.getKey(), keyValue.getValue());
-    }
-
-    try {
-      if (!metadata.isEmpty()) {
-        kvContainer.update(metadata, forceUpdate);
-      }
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    }
-    return ContainerUtils.getSuccessResponse(request);
-  }
-
-  /**
-   * Handles Delete Container Request.
-   * Open containers cannot be deleted.
-   * Holds writeLock on ContainerSet till the container is removed from
-   * containerMap. On disk deletion of container files will happen
-   * asynchronously without the lock.
-   */
-  ContainerCommandResponseProto handleDeleteContainer(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasDeleteContainer()) {
-      LOG.debug("Malformed Delete container request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    boolean forceDelete = request.getDeleteContainer().getForceDelete();
-    kvContainer.writeLock();
-    try {
-      // Check if container is open
-      if (kvContainer.getContainerData().isOpen()) {
-        kvContainer.writeUnlock();
-        throw new StorageContainerException(
-            "Deletion of Open Container is not allowed.",
-            DELETE_ON_OPEN_CONTAINER);
-      } else if (!forceDelete && kvContainer.getContainerData().getKeyCount()
-          > 0) {
-        // If the container is not empty and cannot be deleted forcibly,
-        // then throw a SCE to stop deleting.
-        kvContainer.writeUnlock();
-        throw new StorageContainerException(
-            "Container cannot be deleted because it is not empty.",
-            ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
-      } else {
-        long containerId = kvContainer.getContainerData().getContainerID();
-        containerSet.removeContainer(containerId);
-        openContainerBlockMap.removeContainer(containerId);
-        // Release the lock first.
-        // Avoid holding write locks for disk operations
-        kvContainer.writeUnlock();
-
-        kvContainer.delete(forceDelete);
-      }
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } finally {
-      if (kvContainer.hasWriteLock()) {
-        kvContainer.writeUnlock();
-      }
-    }
-    return ContainerUtils.getSuccessResponse(request);
-  }
-
-  /**
-   * Handles Close Container Request. An open container is closed.
-   */
-  ContainerCommandResponseProto handleCloseContainer(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasCloseContainer()) {
-      LOG.debug("Malformed Update Container request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    long containerID = kvContainer.getContainerData().getContainerID();
-    ContainerLifeCycleState containerState = kvContainer.getContainerState();
-
-    try {
-      if (containerState == ContainerLifeCycleState.CLOSED) {
-        LOG.debug("Container {} is already closed.", containerID);
-        return ContainerUtils.getSuccessResponse(request);
-      } else if (containerState == ContainerLifeCycleState.INVALID) {
-        LOG.debug("Invalid container data. ContainerID: {}", containerID);
-        throw new StorageContainerException("Invalid container data. " +
-            "ContainerID: " + containerID, INVALID_CONTAINER_STATE);
-      }
-
-      KeyValueContainerData kvData = kvContainer.getContainerData();
-
-      // remove the container from open block map once, all the blocks
-      // have been committed and the container is closed
-      kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
-      commitPendingBlocks(kvContainer);
-      kvContainer.close();
-      // make sure the the container open keys from BlockMap gets removed
-      openContainerBlockMap.removeContainer(kvData.getContainerID());
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Close Container failed", ex,
-              IO_EXCEPTION), request);
-    }
-
-    return ContainerUtils.getSuccessResponse(request);
-  }
-
-  /**
-   * Handle Put Block operation. Calls BlockManager to process the request.
-   */
-  ContainerCommandResponseProto handlePutBlock(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    long blockLength;
-    if (!request.hasPutBlock()) {
-      LOG.debug("Malformed Put Key request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    try {
-      checkContainerOpen(kvContainer);
-
-      BlockData blockData = BlockData.getFromProtoBuf(
-          request.getPutBlock().getBlockData());
-      long numBytes = blockData.getProtoBufMessage().toByteArray().length;
-      blockLength = commitKey(blockData, kvContainer);
-      metrics.incContainerBytesStats(Type.PutBlock, numBytes);
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Put Key failed", ex, IO_EXCEPTION),
-          request);
-    }
-
-    return BlockUtils.putBlockResponseSuccess(request, blockLength);
-  }
-
-  private void commitPendingBlocks(KeyValueContainer kvContainer)
-      throws IOException {
-    long containerId = kvContainer.getContainerData().getContainerID();
-    List<BlockData> pendingBlocks =
-        this.openContainerBlockMap.getOpenBlocks(containerId);
-    for(BlockData blockData : pendingBlocks) {
-      commitKey(blockData, kvContainer);
-    }
-  }
-
-  private long commitKey(BlockData blockData, KeyValueContainer kvContainer)
-      throws IOException {
-    Preconditions.checkNotNull(blockData);
-    long length = blockManager.putBlock(kvContainer, blockData);
-    //update the open key Map in containerManager
-    this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID());
-    return length;
-  }
-  /**
-   * Handle Get Block operation. Calls BlockManager to process the request.
-   */
-  ContainerCommandResponseProto handleGetBlock(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasGetBlock()) {
-      LOG.debug("Malformed Get Key request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    BlockData responseData;
-    try {
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getGetBlock().getBlockID());
-      responseData = blockManager.getBlock(kvContainer, blockID);
-      long numBytes = responseData.getProtoBufMessage().toByteArray().length;
-      metrics.incContainerBytesStats(Type.GetBlock, numBytes);
-
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Get Key failed", ex, IO_EXCEPTION),
-          request);
-    }
-
-    return BlockUtils.getBlockDataResponse(request, responseData);
-  }
-
-  /**
-   * Handles GetCommittedBlockLength operation.
-   * Calls BlockManager to process the request.
-   */
-  ContainerCommandResponseProto handleGetCommittedBlockLength(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-    if (!request.hasGetCommittedBlockLength()) {
-      LOG.debug("Malformed Get Key request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    long blockLength;
-    try {
-      BlockID blockID = BlockID
-          .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID());
-      // Check if it really exists in the openContainerBlockMap
-      if (openContainerBlockMap.checkIfBlockExists(blockID)) {
-        String msg = "Block " + blockID + " is not committed yet.";
-        throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED);
-      }
-      blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID);
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("GetCommittedBlockLength failed", ex,
-              IO_EXCEPTION), request);
-    }
-
-    return BlockUtils.getBlockLengthResponse(request, blockLength);
-  }
-
-  /**
-   * Handle Delete Block operation. Calls BlockManager to process the request.
-   */
-  ContainerCommandResponseProto handleDeleteBlock(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasDeleteBlock()) {
-      LOG.debug("Malformed Delete Key request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    try {
-      checkContainerOpen(kvContainer);
-
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getDeleteBlock().getBlockID());
-
-      blockManager.deleteBlock(kvContainer, blockID);
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION),
-          request);
-    }
-
-    return BlockUtils.getBlockResponseSuccess(request);
-  }
-
-  /**
-   * Handle Read Chunk operation. Calls ChunkManager to process the request.
-   */
-  ContainerCommandResponseProto handleReadChunk(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasReadChunk()) {
-      LOG.debug("Malformed Read Chunk request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    ChunkInfo chunkInfo;
-    byte[] data;
-    try {
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getReadChunk().getBlockID());
-      chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk()
-          .getChunkData());
-      Preconditions.checkNotNull(chunkInfo);
-
-      data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
-      metrics.incContainerBytesStats(Type.ReadChunk, data.length);
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION),
-          request);
-    }
-
-    return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
-  }
-
-  /**
-   * Handle Delete Chunk operation. Calls ChunkManager to process the request.
-   */
-  ContainerCommandResponseProto handleDeleteChunk(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasDeleteChunk()) {
-      LOG.debug("Malformed Delete Chunk request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    try {
-      checkContainerOpen(kvContainer);
-
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getDeleteChunk().getBlockID());
-      ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk()
-          .getChunkData();
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
-      Preconditions.checkNotNull(chunkInfo);
-
-      chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
-      openContainerBlockMap.removeChunk(blockID, chunkInfoProto);
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Delete Chunk failed", ex,
-              IO_EXCEPTION), request);
-    }
-
-    return ChunkUtils.getChunkResponseSuccess(request);
-  }
-
-  /**
-   * Handle Write Chunk operation. Calls ChunkManager to process the request.
-   */
-  ContainerCommandResponseProto handleWriteChunk(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasWriteChunk()) {
-      LOG.debug("Malformed Write Chunk request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    try {
-      checkContainerOpen(kvContainer);
-
-      BlockID blockID = BlockID.getFromProtobuf(
-          request.getWriteChunk().getBlockID());
-      ContainerProtos.ChunkInfo chunkInfoProto =
-          request.getWriteChunk().getChunkData();
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
-      Preconditions.checkNotNull(chunkInfo);
-
-      byte[] data = null;
-      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
-          request.getWriteChunk().getStage() == Stage.COMBINED) {
-        data = request.getWriteChunk().getData().toByteArray();
-      }
-
-      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
-          request.getWriteChunk().getStage());
-
-      // We should increment stats after writeChunk
-      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
-          request.getWriteChunk().getStage() == Stage.COMBINED) {
-        metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
-            .getChunkData().getLen());
-      }
-
-      if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
-          || request.getWriteChunk().getStage() == Stage.COMBINED) {
-        // the openContainerBlockMap should be updated only during
-        // COMMIT_STAGE of handling write chunk request.
-        openContainerBlockMap.addChunk(blockID, chunkInfoProto);
-      }
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Write Chunk failed", ex, 
IO_EXCEPTION),
-          request);
-    }
-
-    return ChunkUtils.getChunkResponseSuccess(request);
-  }
-
-  /**
-   * Handle Put Small File operation. Writes the chunk and associated key
-   * using a single RPC. Calls BlockManager and ChunkManager to process the
-   * request.
-   */
-  ContainerCommandResponseProto handlePutSmallFile(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasPutSmallFile()) {
-      LOG.debug("Malformed Put Small File request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-    PutSmallFileRequestProto putSmallFileReq =
-        request.getPutSmallFile();
-
-    try {
-      checkContainerOpen(kvContainer);
-
-      BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock()
-          .getBlockData().getBlockID());
-      BlockData blockData = BlockData.getFromProtoBuf(
-          putSmallFileReq.getBlock().getBlockData());
-      Preconditions.checkNotNull(blockData);
-
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
-          putSmallFileReq.getChunkInfo());
-      Preconditions.checkNotNull(chunkInfo);
-      byte[] data = putSmallFileReq.getData().toByteArray();
-      // chunks will be committed as a part of handling putSmallFile
-      // here. There is no need to maintain this info in openContainerBlockMap.
-      chunkManager.writeChunk(
-          kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
-
-      List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
-      chunks.add(chunkInfo.getProtoBufMessage());
-      blockData.setChunks(chunks);
-      blockManager.putBlock(kvContainer, blockData);
-      metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
-
-    } catch (StorageContainerException ex) {
-      return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Read Chunk failed", ex,
-              PUT_SMALL_FILE_ERROR), request);
-    }
-
-    return SmallFileUtils.getPutFileResponseSuccess(request);
-  }
-
-  /**
-   * Handle Get Small File operation. Gets a data stream using a key. This
-   * helps in reducing the RPC overhead for small files. Calls BlockManager and
-   * ChunkManager to process the request.
-   */
-  ContainerCommandResponseProto handleGetSmallFile(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
-
-    if (!request.hasGetSmallFile()) {
-      LOG.debug("Malformed Get Small File request. trace ID: {}",
-          request.getTraceID());
-      return ContainerUtils.malformedRequest(request);
-    }
-
-    GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
-
-    try {
-      BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
-          .getBlockID());
-      BlockData responseData = blockManager.getBlock(kvContainer, blockID);
-
-      ContainerProtos.ChunkInfo chunkInfo = null;
-      ByteString dataBuf = ByteString.EMPTY;
-      for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
-        byte[] data = chunkManager.readChunk(kvContainer, blockID,
-            ChunkInfo.getFromProtoBuf(chunk));
-        ByteString current = ByteString.copyFrom(data);
-        dataBuf = dataBuf.concat(current);
-        chunkInfo = chunk;
-      }
-      metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size());
-      return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf
-          .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo));
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, request);
-    } catch (IOException ex) {
-      return ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Write Chunk failed", ex,
-              GET_SMALL_FILE_ERROR), request);
-    }
-  }
-
-  /**
-   * Handle unsupported operation.
-   */
-  ContainerCommandResponseProto handleUnsupportedOp(
-      ContainerCommandRequestProto request) {
-    // TODO : remove all unsupported operations or handle them.
-    return ContainerUtils.unsupportedRequest(request);
-  }
-
-  /**
-   * Check if container is open. Throw exception otherwise.
-   * @param kvContainer
-   * @throws StorageContainerException
-   */
-  private void checkContainerOpen(KeyValueContainer kvContainer)
-      throws StorageContainerException {
-
-    ContainerLifeCycleState containerState = kvContainer.getContainerState();
-
-    if (containerState == ContainerLifeCycleState.OPEN) {
-      return;
-    } else {
-      String msg = "Requested operation not allowed as ContainerState is " +
-          containerState;
-      ContainerProtos.Result result = null;
-      switch (containerState) {
-      case CLOSING:
-      case CLOSED:
-        result = CLOSED_CONTAINER_IO;
-        break;
-      case INVALID:
-        result = INVALID_CONTAINER_STATE;
-        break;
-      default:
-        result = CONTAINER_INTERNAL_ERROR;
-      }
-
-      throw new StorageContainerException(msg, result);
-    }
-  }
-
-  public Container importContainer(long containerID, long maxSize,
-      FileInputStream rawContainerStream,
-      TarContainerPacker packer)
-      throws IOException {
-
-    KeyValueContainerData containerData =
-        new KeyValueContainerData(containerID,
-            maxSize);
-
-    KeyValueContainer container = new KeyValueContainer(containerData,
-        conf);
-
-    populateContainerPathFields(container, maxSize);
-    container.importContainerData(rawContainerStream, packer);
-    return container;
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
deleted file mode 100644
index 13689a7..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorInputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.apache.commons.io.IOUtils;
-
-/**
- * Compress/uncompress KeyValueContainer data to a tar.gz archive.
- */
-public class TarContainerPacker
-    implements ContainerPacker<KeyValueContainerData> {
-
-  private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
-
-  private static final String DB_DIR_NAME = "db";
-
-  private static final String CONTAINER_FILE_NAME = "container.yaml";
-
-
-
-  /**
-   * Given an input stream (tar file) extract the data to the specified
-   * directories.
-   *
-   * @param container container which defines the destination structure.
-   * @param inputStream the input stream.
-   * @throws IOException
-   */
-  @Override
-  public byte[] unpackContainerData(Container<KeyValueContainerData> container,
-      InputStream inputStream)
-      throws IOException {
-    byte[] descriptorFileContent = null;
-    try {
-      KeyValueContainerData containerData = container.getContainerData();
-      CompressorInputStream compressorInputStream =
-          new CompressorStreamFactory()
-              .createCompressorInputStream(CompressorStreamFactory.GZIP,
-                  inputStream);
-
-      TarArchiveInputStream tarInput =
-          new TarArchiveInputStream(compressorInputStream);
-
-      TarArchiveEntry entry = tarInput.getNextTarEntry();
-      while (entry != null) {
-        String name = entry.getName();
-        if (name.startsWith(DB_DIR_NAME + "/")) {
-          Path destinationPath = containerData.getDbFile().toPath()
-              .resolve(name.substring(DB_DIR_NAME.length() + 1));
-          extractEntry(tarInput, entry.getSize(), destinationPath);
-        } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
-          Path destinationPath = Paths.get(containerData.getChunksPath())
-              .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
-          extractEntry(tarInput, entry.getSize(), destinationPath);
-        } else if (name.equals(CONTAINER_FILE_NAME)) {
-          //Don't do anything. Container file should be unpacked in a
-          //separated step by unpackContainerDescriptor call.
-          descriptorFileContent = readEntry(tarInput, entry);
-        } else {
-          throw new IllegalArgumentException(
-              "Unknown entry in the tar file: " + "" + name);
-        }
-        entry = tarInput.getNextTarEntry();
-      }
-      return descriptorFileContent;
-
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't uncompress the given container: " + container
-              .getContainerData().getContainerID(),
-          e);
-    }
-  }
-
-  private void extractEntry(TarArchiveInputStream tarInput, long size,
-      Path path) throws IOException {
-    Preconditions.checkNotNull(path, "Path element should not be null");
-    Path parent = Preconditions.checkNotNull(path.getParent(),
-        "Path element should have a parent directory");
-    Files.createDirectories(parent);
-    try (BufferedOutputStream bos = new BufferedOutputStream(
-        new FileOutputStream(path.toAbsolutePath().toString()))) {
-      int bufferSize = 1024;
-      byte[] buffer = new byte[bufferSize + 1];
-      long remaining = size;
-      while (remaining > 0) {
-        int read =
-            tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
-        if (read >= 0) {
-          remaining -= read;
-          bos.write(buffer, 0, read);
-        } else {
-          remaining = 0;
-        }
-      }
-    }
-
-  }
-
-  /**
-   * Given a containerData include all the required container data/metadata
-   * in a tar file.
-   *
-   * @param container Container to archive (data + metadata).
-   * @param destination   Destination tar file/stream.
-   * @throws IOException
-   */
-  @Override
-  public void pack(Container<KeyValueContainerData> container,
-      OutputStream destination)
-      throws IOException {
-
-    KeyValueContainerData containerData = container.getContainerData();
-
-    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
-          .createCompressorOutputStream(CompressorStreamFactory.GZIP,
-              destination)) {
-
-      try (ArchiveOutputStream archiveOutputStream = new 
TarArchiveOutputStream(
-          gzippedOut)) {
-
-        includePath(containerData.getDbFile().toString(), DB_DIR_NAME,
-            archiveOutputStream);
-
-        includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME,
-            archiveOutputStream);
-
-        includeFile(container.getContainerFile(),
-            CONTAINER_FILE_NAME,
-            archiveOutputStream);
-      }
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't compress the container: " + containerData.getContainerID(),
-          e);
-    }
-
-  }
-
-  @Override
-  public byte[] unpackContainerDescriptor(InputStream inputStream)
-      throws IOException {
-    try {
-      CompressorInputStream compressorInputStream =
-          new CompressorStreamFactory()
-              .createCompressorInputStream(CompressorStreamFactory.GZIP,
-                  inputStream);
-
-      TarArchiveInputStream tarInput =
-          new TarArchiveInputStream(compressorInputStream);
-
-      TarArchiveEntry entry = tarInput.getNextTarEntry();
-      while (entry != null) {
-        String name = entry.getName();
-        if (name.equals(CONTAINER_FILE_NAME)) {
-          return readEntry(tarInput, entry);
-        }
-        entry = tarInput.getNextTarEntry();
-      }
-
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't read the container descriptor from the container archive",
-          e);
-    }
-    throw new IOException(
-        "Container descriptor is missing from the container archive.");
-  }
-
-  private byte[] readEntry(TarArchiveInputStream tarInput,
-      TarArchiveEntry entry) throws IOException {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    int bufferSize = 1024;
-    byte[] buffer = new byte[bufferSize + 1];
-    long remaining = entry.getSize();
-    while (remaining > 0) {
-      int read =
-          tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
-      remaining -= read;
-      bos.write(buffer, 0, read);
-    }
-    return bos.toByteArray();
-  }
-
-  private void includePath(String containerPath, String subdir,
-      ArchiveOutputStream archiveOutputStream) throws IOException {
-
-    for (Path path : Files.list(Paths.get(containerPath))
-        .collect(Collectors.toList())) {
-
-      includeFile(path.toFile(), subdir + "/" + path.getFileName(),
-          archiveOutputStream);
-    }
-  }
-
-  private void includeFile(File file, String entryName,
-      ArchiveOutputStream archiveOutputStream) throws IOException {
-    ArchiveEntry archiveEntry =
-        archiveOutputStream.createArchiveEntry(file, entryName);
-    archiveOutputStream.putArchiveEntry(archiveEntry);
-    try (FileInputStream fis = new FileInputStream(file)) {
-      IOUtils.copy(fis, archiveOutputStream);
-    }
-    archiveOutputStream.closeArchiveEntry();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
deleted file mode 100644
index f5cc847..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetBlockResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    GetCommittedBlockLengthResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    PutBlockResponseProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_BLOCK;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_READ_METADATA_DB;
-
-/**
- * Utils functions to help block functions.
- */
-public final class BlockUtils {
-
-  /** Never constructed. **/
-  private BlockUtils() {
-
-  }
-  /**
-   * Get a DB handler for a given container.
-   * If the handler doesn't exist in cache yet, first create one and
-   * add into cache. This function is called with containerManager
-   * ReadLock held.
-   *
-   * @param containerData containerData.
-   * @param conf configuration.
-   * @return MetadataStore handle.
-   * @throws StorageContainerException
-   */
-  public static MetadataStore getDB(KeyValueContainerData containerData,
-                                    Configuration conf) throws
-      StorageContainerException {
-    Preconditions.checkNotNull(containerData);
-    ContainerCache cache = ContainerCache.getInstance(conf);
-    Preconditions.checkNotNull(cache);
-    Preconditions.checkNotNull(containerData.getDbFile());
-    try {
-      return cache.getDB(containerData.getContainerID(), containerData
-          .getContainerDBType(), containerData.getDbFile().getAbsolutePath());
-    } catch (IOException ex) {
-      String message = String.format("Error opening DB. Container:%s " +
-          "ContainerPath:%s", containerData.getContainerID(), containerData
-          .getDbFile().getPath());
-      throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
-    }
-  }
-  /**
-   * Remove a DB handler from cache.
-   *
-   * @param container - Container data.
-   * @param conf - Configuration.
-   */
-  public static void removeDB(KeyValueContainerData container, Configuration
-      conf) {
-    Preconditions.checkNotNull(container);
-    ContainerCache cache = ContainerCache.getInstance(conf);
-    Preconditions.checkNotNull(cache);
-    cache.removeDB(container.getContainerID());
-  }
-
-  /**
-   * Shutdown all DB Handles.
-   *
-   * @param cache - Cache for DB Handles.
-   */
-  @SuppressWarnings("unchecked")
-  public static void shutdownCache(ContainerCache cache)  {
-    cache.shutdownCache();
-  }
-
-  /**
-   * Parses the {@link BlockData} from a bytes array.
-   *
-   * @param bytes Block data in bytes.
-   * @return Block data.
-   * @throws IOException if the bytes array is malformed or invalid.
-   */
-  public static BlockData getBlockData(byte[] bytes) throws IOException {
-    try {
-      ContainerProtos.BlockData blockData = 
ContainerProtos.BlockData.parseFrom(
-          bytes);
-      BlockData data = BlockData.getFromProtoBuf(blockData);
-      return data;
-    } catch (IOException e) {
-      throw new StorageContainerException("Failed to parse block data from " +
-          "the bytes array.", NO_SUCH_BLOCK);
-    }
-  }
-
-  /**
-   * Returns putBlock response success.
-   * @param msg - Request.
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto putBlockResponseSuccess(
-      ContainerCommandRequestProto msg, long blockLength) {
-    GetCommittedBlockLengthResponseProto.Builder
-        committedBlockLengthResponseBuilder =
-        getCommittedBlockLengthResponseBuilder(blockLength,
-            msg.getPutBlock().getBlockData().getBlockID());
-    PutBlockResponseProto.Builder putKeyResponse =
-        PutBlockResponseProto.newBuilder();
-    putKeyResponse
-        .setCommittedBlockLength(committedBlockLengthResponseBuilder);
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setPutBlock(putKeyResponse);
-    return builder.build();
-  }
-  /**
-   * Returns successful blockResponse.
-   * @param msg - Request.
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto getBlockResponseSuccess(
-      ContainerCommandRequestProto msg) {
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-
-  public static ContainerCommandResponseProto getBlockDataResponse(
-      ContainerCommandRequestProto msg, BlockData data) {
-    GetBlockResponseProto.Builder getBlock = ContainerProtos
-        .GetBlockResponseProto
-        .newBuilder();
-    getBlock.setBlockData(data.getProtoBufMessage());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setGetBlock(getBlock);
-    return  builder.build();
-  }
-
-  /**
-   * Returns successful getCommittedBlockLength Response.
-   * @param msg - Request.
-   * @return Response.
-   */
-  public static ContainerCommandResponseProto getBlockLengthResponse(
-          ContainerCommandRequestProto msg, long blockLength) {
-    GetCommittedBlockLengthResponseProto.Builder
-        committedBlockLengthResponseBuilder =
-        getCommittedBlockLengthResponseBuilder(blockLength,
-            msg.getGetCommittedBlockLength().getBlockID());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder);
-    return builder.build();
-  }
-
-  private static GetCommittedBlockLengthResponseProto.Builder
-          getCommittedBlockLengthResponseBuilder(long blockLength,
-      ContainerProtos.DatanodeBlockID blockID) {
-    ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
-        getCommittedBlockLengthResponseBuilder = ContainerProtos.
-        GetCommittedBlockLengthResponseProto.newBuilder();
-    getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
-    getCommittedBlockLengthResponseBuilder.setBlockID(blockID);
-    return getCommittedBlockLengthResponseBuilder;
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to