http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
deleted file mode 100644
index 957a6d9..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.BucketInfo;
-import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.util.Time;
-import org.iq80.leveldb.DBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * KSM bucket manager.
- */
-public class BucketManagerImpl implements BucketManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BucketManagerImpl.class);
-
-  /**
-   * KSMMetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
-   */
-  private final KSMMetadataManager metadataManager;
-
-  /**
-   * Constructs BucketManager.
-   * @param metadataManager
-   */
-  public BucketManagerImpl(KSMMetadataManager metadataManager){
-    this.metadataManager = metadataManager;
-  }
-
-  /**
-   * MetadataDB is maintained in MetadataManager and shared between
-   * BucketManager and VolumeManager. (and also by KeyManager)
-   *
-   * BucketManager uses MetadataDB to store bucket level information.
-   *
-   * Keys used in BucketManager for storing data into MetadataDB
-   * for BucketInfo:
-   * {volume/bucket} -> bucketInfo
-   *
-   * Work flow of create bucket:
-   *
-   * -> Check if the Volume exists in metadataDB, if not throw
-   * VolumeNotFoundException.
-   * -> Else check if the Bucket exists in metadataDB, if so throw
-   * BucketExistException
-   * -> Else update MetadataDB with VolumeInfo.
-   */
-
-  /**
-   * Creates a bucket.
-   * @param bucketInfo - KsmBucketInfo.
-   */
-  @Override
-  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
-    Preconditions.checkNotNull(bucketInfo);
-    metadataManager.writeLock().lock();
-    String volumeName = bucketInfo.getVolumeName();
-    String bucketName = bucketInfo.getBucketName();
-    try {
-      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-
-      //Check if the volume exists
-      if (metadataManager.get(volumeKey) == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      //Check if bucket already exists
-      if (metadataManager.get(bucketKey) != null) {
-        LOG.debug("bucket: {} already exists ", bucketName);
-        throw new KSMException("Bucket already exist",
-            KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
-      }
-
-      KsmBucketInfo ksmBucketInfo = KsmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setAcls(bucketInfo.getAcls())
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(Time.now())
-          .build();
-      metadataManager.put(bucketKey, 
ksmBucketInfo.getProtobuf().toByteArray());
-
-      LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Bucket creation failed for bucket:{} in volume:{}",
-            bucketName, volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Returns Bucket Information.
-   *
-   * @param volumeName - Name of the Volume.
-   * @param bucketName - Name of the Bucket.
-   */
-  @Override
-  public KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    metadataManager.readLock().lock();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      byte[] value = metadataManager.get(bucketKey);
-      if (value == null) {
-        LOG.debug("bucket: {} not found in volume: {}.", bucketName,
-            volumeName);
-        throw new KSMException("Bucket not found",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      return KsmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Exception while getting bucket info for bucket: {}",
-            bucketName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  /**
-   * Sets bucket property from args.
-   * @param args - BucketArgs.
-   * @throws IOException
-   */
-  @Override
-  public void setBucketProperty(KsmBucketArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      //Check if volume exists
-      if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
-          null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      byte[] value = metadataManager.get(bucketKey);
-      //Check if bucket exist
-      if(value == null) {
-        LOG.debug("bucket: {} not found ", bucketName);
-        throw new KSMException("Bucket doesn't exist",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      KsmBucketInfo oldBucketInfo = KsmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(value));
-      KsmBucketInfo.Builder bucketInfoBuilder = KsmBucketInfo.newBuilder();
-      bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
-          .setBucketName(oldBucketInfo.getBucketName());
-
-      //Check ACLs to update
-      if(args.getAddAcls() != null || args.getRemoveAcls() != null) {
-        bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(),
-            args.getRemoveAcls(), args.getAddAcls()));
-        LOG.debug("Updating ACLs for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder.setAcls(oldBucketInfo.getAcls());
-      }
-
-      //Check StorageType to update
-      StorageType storageType = args.getStorageType();
-      if (storageType != null) {
-        bucketInfoBuilder.setStorageType(storageType);
-        LOG.debug("Updating bucket storage type for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType());
-      }
-
-      //Check Versioning to update
-      Boolean versioning = args.getIsVersionEnabled();
-      if (versioning != null) {
-        bucketInfoBuilder.setIsVersionEnabled(versioning);
-        LOG.debug("Updating bucket versioning for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder
-            .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled());
-      }
-      bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());
-
-      metadataManager.put(bucketKey,
-          bucketInfoBuilder.build().getProtobuf().toByteArray());
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
-            bucketName, volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Updates the existing ACL list with remove and add ACLs that are passed.
-   * Remove is done before Add.
-   *
-   * @param existingAcls - old ACL list.
-   * @param removeAcls - ACLs to be removed.
-   * @param addAcls - ACLs to be added.
-   * @return updated ACL list.
-   */
-  private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls,
-      List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) {
-    if(removeAcls != null && !removeAcls.isEmpty()) {
-      existingAcls.removeAll(removeAcls);
-    }
-    if(addAcls != null && !addAcls.isEmpty()) {
-      addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach(
-          existingAcls::add);
-    }
-    return existingAcls;
-  }
-
-  /**
-   * Deletes an existing empty bucket from volume.
-   * @param volumeName - Name of the volume.
-   * @param bucketName - Name of the bucket.
-   * @throws IOException
-   */
-  public void deleteBucket(String volumeName, String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    metadataManager.writeLock().lock();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      //Check if volume exists
-      if (metadataManager.get(metadataManager.getVolumeKey(volumeName))
-          == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      //Check if bucket exist
-      if (metadataManager.get(bucketKey) == null) {
-        LOG.debug("bucket: {} not found ", bucketName);
-        throw new KSMException("Bucket doesn't exist",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      //Check if bucket is empty
-      if (!metadataManager.isBucketEmpty(volumeName, bucketName)) {
-        LOG.debug("bucket: {} is not empty ", bucketName);
-        throw new KSMException("Bucket is not empty",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY);
-      }
-      metadataManager.delete(bucketKey);
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Delete bucket failed for bucket:{} in volume:{}", 
bucketName,
-            volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(String volumeName,
-      String startBucket, String bucketPrefix, int maxNumOfBuckets)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.listBuckets(
-          volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
deleted file mode 100644
index 42331f6..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.jmx.ServiceRuntimeInfo;
-
-/**
- * This is the JMX management interface for ksm information.
- */
-@InterfaceAudience.Private
-public interface KSMMXBean extends ServiceRuntimeInfo {
-
-  String getRpcPort();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
deleted file mode 100644
index f5a2d5b..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-
-/**
- * KSM metadata manager interface.
- */
-public interface KSMMetadataManager {
-  /**
-   * Start metadata manager.
-   */
-  void start();
-
-  /**
-   * Stop metadata manager.
-   */
-  void stop() throws IOException;
-
-  /**
-   * Get metadata store.
-   * @return metadata store.
-   */
-  @VisibleForTesting
-  MetadataStore getStore();
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  Lock readLock();
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  Lock writeLock();
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  byte[] get(byte[] key) throws IOException;
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  void put(byte[] key, byte[] value) throws IOException;
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  void delete(byte[] key) throws IOException;
-
-  /**
-   * Atomic write a batch of operations.
-   * @param batch
-   * @throws IOException
-   */
-  void writeBatch(BatchOperation batch) throws IOException;
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  byte[] getVolumeKey(String volume);
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  byte[] getUserKey(String user);
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  byte[] getBucketKey(String volume, String bucket);
-
-  /**
-   * Given a volume, bucket and a key, return the corresponding DB key.
-   * @param volume - volume name
-   * @param bucket - bucket name
-   * @param key - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDBKeyBytes(String volume, String bucket, String key);
-
-  /**
-   * Returns the DB key name of a deleted key in KSM metadata store.
-   * The name for a deleted key has prefix #deleting# followed by
-   * the actual key name.
-   * @param keyName - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDeletedKeyName(byte[] keyName);
-
-  /**
-   * Returns the DB key name of a open key in KSM metadata store.
-   * Should be #open# prefix followed by actual key name.
-   * @param keyName - key name
-   * @param id - the id for this open
-   * @return bytes of DB key.
-   */
-  byte[] getOpenKeyNameBytes(String keyName, int id);
-
-  /**
-   * Returns the full name of a key given volume name, bucket name and key 
name.
-   * Generally done by padding certain delimiters.
-   *
-   * @param volumeName - volume name
-   * @param bucketName - bucket name
-   * @param keyName - key name
-   * @return the full key name.
-   */
-  String getKeyWithDBPrefix(String volumeName, String bucketName,
-      String keyName);
-
-  /**
-   * Given a volume, check if it is empty,
-   * i.e there are no buckets inside it.
-   * @param volume - Volume name
-   */
-  boolean isVolumeEmpty(String volume) throws IOException;
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param  bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  boolean isBucketEmpty(String volume, String bucket) throws IOException;
-
-  /**
-   * Returns a list of buckets represented by {@link KsmBucketInfo}
-   * in the given volume.
-   *
-   * @param volumeName
-   *   the name of the volume. This argument is required,
-   *   this method returns buckets in this given volume.
-   * @param startBucket
-   *   the start bucket name. Only the buckets whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param bucketPrefix
-   *   bucket name prefix. Only the buckets whose name has
-   *   this prefix will be included in the result.
-   * @param maxNumOfBuckets
-   *   the maximum number of buckets to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of buckets.
-   * @throws IOException
-   */
-  List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
-      String bucketPrefix, int maxNumOfBuckets) throws IOException;
-
-  /**
-   * Returns a list of keys represented by {@link KsmKeyInfo}
-   * in the given bucket.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKey
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<KsmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKey, String keyPrefix, int maxKeys)
-      throws IOException;
-
-  /**
-   * Returns a list of volumes owned by a given user; if user is null,
-   * returns all volumes.
-   *
-   * @param userName
-   *   volume owner
-   * @param prefix
-   *   the volume prefix used to filter the listing result.
-   * @param startKey
-   *   the start volume name determines where to start listing from,
-   *   this key is excluded from the result.
-   * @param maxKeys
-   *   the maximum number of volumes to return.
-   * @return a list of {@link KsmVolumeArgs}
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listVolumes(String userName, String prefix,
-      String startKey, int maxKeys) throws IOException;
-
-  /**
-   * Returns a list of pending deletion key info that ups to the given count.
-   * Each entry is a {@link BlockGroup}, which contains the info about the
-   * key name and all its associated block IDs. A pending deletion key is
-   * stored with #deleting# prefix in KSM DB.
-   *
-   * @param count max number of keys to return.
-   * @return a list of {@link BlockGroup} represent keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
-
-  /**
-   * Returns a list of all still open key info. Which contains the info about
-   * the key name and all its associated block IDs. A pending open key has
-   * prefix #open# in KSM DB.
-   *
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getExpiredOpenKeys() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
deleted file mode 100644
index df02182..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_MB;
-
-/**
- * KSM metadata manager interface.
- */
-public class KSMMetadataManagerImpl implements KSMMetadataManager {
-
-  private final MetadataStore store;
-  private final ReadWriteLock lock;
-  private final long openKeyExpireThresholdMS;
-
-  public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
-    File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
-        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
-    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
-    this.store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(ksmDBFile)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
-    this.lock = new ReentrantReadWriteLock();
-    this.openKeyExpireThresholdMS = 1000 * conf.getInt(
-        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
-        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
-  }
-
-  /**
-   * Start metadata manager.
-   */
-  @Override
-  public void start() {
-
-  }
-
-  /**
-   * Stop metadata manager.
-   */
-  @Override
-  public void stop() throws IOException {
-    if (store != null) {
-      store.close();
-    }
-  }
-
-  /**
-   * Get metadata store.
-   * @return store - metadata store.
-   */
-  @VisibleForTesting
-  @Override
-  public MetadataStore getStore() {
-    return store;
-  }
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  public byte[] getVolumeKey(String volume) {
-    String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
-    return DFSUtil.string2Bytes(dbVolumeName);
-  }
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  public byte[] getUserKey(String user) {
-    String dbUserName = OzoneConsts.KSM_USER_PREFIX + user;
-    return DFSUtil.string2Bytes(dbUserName);
-  }
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  public byte[] getBucketKey(String volume, String bucket) {
-    String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
-    return DFSUtil.string2Bytes(bucketKeyString);
-  }
-
-  /**
-   * @param volume
-   * @param bucket
-   * @return
-   */
-  private String getBucketWithDBPrefix(String volume, String bucket) {
-    StringBuffer sb = new StringBuffer();
-    sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
-        .append(volume)
-        .append(OzoneConsts.KSM_BUCKET_PREFIX);
-    if (!Strings.isNullOrEmpty(bucket)) {
-      sb.append(bucket);
-    }
-    return sb.toString();
-  }
-
-  @Override
-  public String getKeyWithDBPrefix(String volume, String bucket, String key) {
-    String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume
-        + OzoneConsts.KSM_KEY_PREFIX + bucket
-        + OzoneConsts.KSM_KEY_PREFIX;
-    return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
-  }
-
-  @Override
-  public byte[] getDBKeyBytes(String volume, String bucket, String key) {
-    return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key));
-  }
-
-  @Override
-  public byte[] getDeletedKeyName(byte[] keyName) {
-    return DFSUtil.string2Bytes(
-        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
-  }
-
-  @Override
-  public byte[] getOpenKeyNameBytes(String keyName, int id) {
-    return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
-        OPEN_KEY_ID_DELIMINATOR + keyName);
-  }
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  @Override
-  public Lock readLock() {
-    return lock.readLock();
-  }
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  @Override
-  public Lock writeLock() {
-    return lock.writeLock();
-  }
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  @Override
-  public byte[] get(byte[] key) throws IOException {
-    return store.get(key);
-  }
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  @Override
-  public void put(byte[] key, byte[] value) throws IOException {
-    store.put(key, value);
-  }
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  public void delete(byte[] key) throws IOException {
-    store.delete(key);
-  }
-
-  @Override
-  public void writeBatch(BatchOperation batch) throws IOException {
-    this.store.writeBatch(batch);
-  }
-
-  /**
-   * Given a volume, check if it is empty, i.e there are no buckets inside it.
-   * @param volume - Volume name
-   * @return true if the volume is empty
-   */
-  public boolean isVolumeEmpty(String volume) throws IOException {
-    String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX;
-    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
-    ImmutablePair<byte[], byte[]> volumeRoot =
-        store.peekAround(0, dbVolumeRootKey);
-    if (volumeRoot != null) {
-      return !DFSUtil.bytes2String(volumeRoot.getKey())
-          .startsWith(dbVolumeRootName);
-    }
-    return true;
-  }
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  public boolean isBucketEmpty(String volume, String bucket)
-      throws IOException {
-    String keyRootName = getKeyWithDBPrefix(volume, bucket, null);
-    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
-    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
-    if (firstKey != null) {
-      return !DFSUtil.bytes2String(firstKey.getKey())
-          .startsWith(keyRootName);
-    }
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(final String volumeName,
-      final String startBucket, final String bucketPrefix,
-      final int maxNumOfBuckets) throws IOException {
-    List<KsmBucketInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    byte[] volumeNameBytes = getVolumeKey(volumeName);
-    if (store.get(volumeNameBytes) == null) {
-      throw new KSMException("Volume " + volumeName + " not found.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-
-    // A bucket starts with /#volume/#bucket_prefix
-    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
-      if (currentKey != null) {
-        String bucketNamePrefix =
-                getBucketWithDBPrefix(volumeName, bucketPrefix);
-        String bucket = DFSUtil.bytes2String(currentKey);
-        return bucket.startsWith(bucketNamePrefix);
-      }
-      return false;
-    };
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startBucket)) {
-      // Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getBucketKey(volumeName, startBucket),
-          maxNumOfBuckets + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
-    } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String keyPrefix, int maxKeys) throws IOException {
-    List<KsmKeyInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    if (Strings.isNullOrEmpty(bucketName)) {
-      throw new KSMException("Bucket name is required.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
-    if (store.get(bucketNameBytes) == null) {
-      throw new KSMException("Bucket " + bucketName + " not found.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    MetadataKeyFilter filter = new KeyPrefixFilter(
-                getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startKey)) {
-      //Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getDBKeyBytes(volumeName, bucketName, startKey),
-          maxKeys + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
-    } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(
-          KeyInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmVolumeArgs> listVolumes(String userName,
-      String prefix, String startKey, int maxKeys) throws IOException {
-    List<KsmVolumeArgs> result = Lists.newArrayList();
-    VolumeList volumes;
-    if (Strings.isNullOrEmpty(userName)) {
-      volumes = getAllVolumes();
-    } else {
-      volumes = getVolumesByUser(userName);
-    }
-
-    if (volumes == null || volumes.getVolumeNamesCount() == 0) {
-      return result;
-    }
-
-    boolean startKeyFound = Strings.isNullOrEmpty(startKey);
-    for (String volumeName : volumes.getVolumeNamesList()) {
-      if (!Strings.isNullOrEmpty(prefix)) {
-        if (!volumeName.startsWith(prefix)) {
-          continue;
-        }
-      }
-
-      if (!startKeyFound && volumeName.equals(startKey)) {
-        startKeyFound = true;
-        continue;
-      }
-      if (startKeyFound && result.size() < maxKeys) {
-        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
-        if (volumeInfo == null) {
-          // Could not get volume info by given volume name,
-          // since the volume name is loaded from db,
-          // this probably means ksm db is corrupted or some entries are
-          // accidentally removed.
-          throw new KSMException("Volume info not found for " + volumeName,
-              ResultCodes.FAILED_VOLUME_NOT_FOUND);
-        }
-        VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
-        KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info);
-        result.add(volumeArgs);
-      }
-    }
-
-    return result;
-  }
-
-  private VolumeList getVolumesByUser(String userName)
-      throws KSMException {
-    return getVolumesByUser(getUserKey(userName));
-  }
-
-  private VolumeList getVolumesByUser(byte[] userNameKey)
-      throws KSMException {
-    VolumeList volumes = null;
-    try {
-      byte[] volumesInBytes = store.get(userNameKey);
-      if (volumesInBytes == null) {
-        // No volume found for this user, return an empty list
-        return VolumeList.newBuilder().build();
-      }
-      volumes = VolumeList.parseFrom(volumesInBytes);
-    } catch (IOException e) {
-      throw new KSMException("Unable to get volumes info by the given user, "
-          + "metadata might be corrupted", e,
-          ResultCodes.FAILED_METADATA_ERROR);
-    }
-    return volumes;
-  }
-
-  private VolumeList getAllVolumes() throws IOException {
-    // Scan all users in database
-    KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX);
-    // We are not expecting a huge number of users per cluster,
-    // it should be fine to scan all users in db and return us a
-    // list of volume names in string per user.
-    List<Map.Entry<byte[], byte[]>> rangeKVs = store
-        .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter);
-
-    VolumeList.Builder builder = VolumeList.newBuilder();
-    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
-      VolumeList volumes = this.getVolumesByUser(entry.getKey());
-      builder.addAllVolumeNames(volumes.getVolumeNamesList());
-    }
-
-    return builder.build();
-  }
-
-  @Override
-  public List<BlockGroup> getPendingDeletionKeys(final int count)
-      throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getRangeKVs(null, count,
-            MetadataKeyFilters.getDeletingKeyFilter());
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
-      // Get block keys as a list.
-      KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
-      if (latest == null) {
-        return Collections.emptyList();
-      }
-      List<String> item = latest.getLocationList().stream()
-          .map(KsmKeyLocationInfo::getBlockID)
-          .collect(Collectors.toList());
-      BlockGroup keyBlocks = BlockGroup.newBuilder()
-          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
-          .addAllBlockIDs(item)
-          .build();
-      keyBlocksList.add(keyBlocks);
-    }
-    return keyBlocksList;
-  }
-
-  @Override
-  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    long now = Time.now();
-    final MetadataKeyFilter openKeyFilter =
-        new KeyPrefixFilter(OPEN_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
-            openKeyFilter);
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
-      long lastModify = info.getModificationTime();
-      if (now - lastModify < this.openKeyExpireThresholdMS) {
-        // consider as may still be active, not hanging.
-        continue;
-      }
-      // Get block keys as a list.
-      List<String> item = info.getLatestVersionLocations()
-          .getBlocksLatestVersionOnly().stream()
-          .map(KsmKeyLocationInfo::getBlockID)
-          .collect(Collectors.toList());
-      BlockGroup keyBlocks = BlockGroup.newBuilder()
-          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
-          .addAllBlockIDs(item)
-          .build();
-      keyBlocksList.add(keyBlocks);
-    }
-    return keyBlocksList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
deleted file mode 100644
index bd29012..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-
-/**
- * This class is for maintaining KeySpaceManager statistics.
- */
-@InterfaceAudience.Private
-@Metrics(about="Key Space Manager Metrics", context="dfs")
-public class KSMMetrics {
-  private static final String SOURCE_NAME =
-      KSMMetrics.class.getSimpleName();
-
-  // KSM request type op metrics
-  private @Metric MutableCounterLong numVolumeOps;
-  private @Metric MutableCounterLong numBucketOps;
-  private @Metric MutableCounterLong numKeyOps;
-
-  // KSM op metrics
-  private @Metric MutableCounterLong numVolumeCreates;
-  private @Metric MutableCounterLong numVolumeUpdates;
-  private @Metric MutableCounterLong numVolumeInfos;
-  private @Metric MutableCounterLong numVolumeCheckAccesses;
-  private @Metric MutableCounterLong numBucketCreates;
-  private @Metric MutableCounterLong numVolumeDeletes;
-  private @Metric MutableCounterLong numBucketInfos;
-  private @Metric MutableCounterLong numBucketUpdates;
-  private @Metric MutableCounterLong numBucketDeletes;
-  private @Metric MutableCounterLong numKeyAllocate;
-  private @Metric MutableCounterLong numKeyLookup;
-  private @Metric MutableCounterLong numKeyDeletes;
-  private @Metric MutableCounterLong numBucketLists;
-  private @Metric MutableCounterLong numKeyLists;
-  private @Metric MutableCounterLong numVolumeLists;
-  private @Metric MutableCounterLong numKeyCommits;
-  private @Metric MutableCounterLong numAllocateBlockCalls;
-  private @Metric MutableCounterLong numGetServiceLists;
-
-  // Failure Metrics
-  private @Metric MutableCounterLong numVolumeCreateFails;
-  private @Metric MutableCounterLong numVolumeUpdateFails;
-  private @Metric MutableCounterLong numVolumeInfoFails;
-  private @Metric MutableCounterLong numVolumeDeleteFails;
-  private @Metric MutableCounterLong numBucketCreateFails;
-  private @Metric MutableCounterLong numVolumeCheckAccessFails;
-  private @Metric MutableCounterLong numBucketInfoFails;
-  private @Metric MutableCounterLong numBucketUpdateFails;
-  private @Metric MutableCounterLong numBucketDeleteFails;
-  private @Metric MutableCounterLong numKeyAllocateFails;
-  private @Metric MutableCounterLong numKeyLookupFails;
-  private @Metric MutableCounterLong numKeyDeleteFails;
-  private @Metric MutableCounterLong numBucketListFails;
-  private @Metric MutableCounterLong numKeyListFails;
-  private @Metric MutableCounterLong numVolumeListFails;
-  private @Metric MutableCounterLong numKeyCommitFails;
-  private @Metric MutableCounterLong numBlockAllocateCallFails;
-  private @Metric MutableCounterLong numGetServiceListFails;
-
-  public KSMMetrics() {
-  }
-
-  public static KSMMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(SOURCE_NAME,
-        "Key Space Manager Metrics",
-        new KSMMetrics());
-  }
-
-  public void incNumVolumeCreates() {
-    numVolumeOps.incr();
-    numVolumeCreates.incr();
-  }
-
-  public void incNumVolumeUpdates() {
-    numVolumeOps.incr();
-    numVolumeUpdates.incr();
-  }
-
-  public void incNumVolumeInfos() {
-    numVolumeOps.incr();
-    numVolumeInfos.incr();
-  }
-
-  public void incNumVolumeDeletes() {
-    numVolumeOps.incr();
-    numVolumeDeletes.incr();
-  }
-
-  public void incNumVolumeCheckAccesses() {
-    numVolumeOps.incr();
-    numVolumeCheckAccesses.incr();
-  }
-
-  public void incNumBucketCreates() {
-    numBucketOps.incr();
-    numBucketCreates.incr();
-  }
-
-  public void incNumBucketInfos() {
-    numBucketOps.incr();
-    numBucketInfos.incr();
-  }
-
-  public void incNumBucketUpdates() {
-    numBucketOps.incr();
-    numBucketUpdates.incr();
-  }
-
-  public void incNumBucketDeletes() {
-    numBucketOps.incr();
-    numBucketDeletes.incr();
-  }
-
-  public void incNumBucketLists() {
-    numBucketOps.incr();
-    numBucketLists.incr();
-  }
-
-  public void incNumKeyLists() {
-    numKeyOps.incr();
-    numKeyLists.incr();
-  }
-
-  public void incNumVolumeLists() {
-    numVolumeOps.incr();
-    numVolumeLists.incr();
-  }
-
-  public void incNumGetServiceLists() {
-    numGetServiceLists.incr();
-  }
-
-  public void incNumVolumeCreateFails() {
-    numVolumeCreateFails.incr();
-  }
-
-  public void incNumVolumeUpdateFails() {
-    numVolumeUpdateFails.incr();
-  }
-
-  public void incNumVolumeInfoFails() {
-    numVolumeInfoFails.incr();
-  }
-
-  public void incNumVolumeDeleteFails() {
-    numVolumeDeleteFails.incr();
-  }
-
-  public void incNumVolumeCheckAccessFails() {
-    numVolumeCheckAccessFails.incr();
-  }
-
-  public void incNumBucketCreateFails() {
-    numBucketCreateFails.incr();
-  }
-
-  public void incNumBucketInfoFails() {
-    numBucketInfoFails.incr();
-  }
-
-  public void incNumBucketUpdateFails() {
-    numBucketUpdateFails.incr();
-  }
-
-  public void incNumBucketDeleteFails() {
-    numBucketDeleteFails.incr();
-  }
-
-  public void incNumKeyAllocates() {
-    numKeyOps.incr();
-    numKeyAllocate.incr();
-  }
-
-  public void incNumKeyAllocateFails() {
-    numKeyAllocateFails.incr();
-  }
-
-  public void incNumKeyLookups() {
-    numKeyOps.incr();
-    numKeyLookup.incr();
-  }
-
-  public void incNumKeyLookupFails() {
-    numKeyLookupFails.incr();
-  }
-
-  public void incNumKeyDeleteFails() {
-    numKeyDeleteFails.incr();
-  }
-
-  public void incNumKeyDeletes() {
-    numKeyOps.incr();
-    numKeyDeletes.incr();
-  }
-
-  public void incNumKeyCommits() {
-    numKeyOps.incr();
-    numKeyCommits.incr();
-  }
-
-  public void incNumKeyCommitFails() {
-    numKeyCommitFails.incr();
-  }
-
-  public void incNumBlockAllocateCalls() {
-    numAllocateBlockCalls.incr();
-  }
-
-  public void incNumBlockAllocateCallFails() {
-    numBlockAllocateCallFails.incr();
-  }
-
-  public void incNumBucketListFails() {
-    numBucketListFails.incr();
-  }
-
-  public void incNumKeyListFails() {
-    numKeyListFails.incr();
-  }
-
-  public void incNumVolumeListFails() {
-    numVolumeListFails.incr();
-  }
-
-  public void incNumGetServiceListFails() {
-    numGetServiceListFails.incr();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCreates() {
-    return numVolumeCreates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeUpdates() {
-    return numVolumeUpdates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeInfos() {
-    return numVolumeInfos.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeDeletes() {
-    return numVolumeDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCheckAccesses() {
-    return numVolumeCheckAccesses.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketCreates() {
-    return numBucketCreates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketInfos() {
-    return numBucketInfos.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketUpdates() {
-    return numBucketUpdates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketDeletes() {
-    return numBucketDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketLists() {
-    return numBucketLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeLists() {
-    return numVolumeLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLists() {
-    return numKeyLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumGetServiceLists() {
-    return numGetServiceLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCreateFails() {
-    return numVolumeCreateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeUpdateFails() {
-    return numVolumeUpdateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeInfoFails() {
-    return numVolumeInfoFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeDeleteFails() {
-    return numVolumeDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCheckAccessFails() {
-    return numVolumeCheckAccessFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketCreateFails() {
-    return numBucketCreateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketInfoFails() {
-    return numBucketInfoFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketUpdateFails() {
-    return numBucketUpdateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketDeleteFails() {
-    return numBucketDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyAllocates() {
-    return numKeyAllocate.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyAllocateFails() {
-    return numKeyAllocateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLookups() {
-    return numKeyLookup.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLookupFails() {
-    return numKeyLookupFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyDeletes() {
-    return numKeyDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyDeletesFails() {
-    return numKeyDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketListFails() {
-    return numBucketListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyListFails() {
-    return numKeyListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeListFails() {
-    return numVolumeListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyCommits() {
-    return numKeyCommits.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyCommitFails() {
-    return numKeyCommitFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockAllocates() {
-    return numAllocateBlockCalls.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockAllocateFails() {
-    return numBlockAllocateCallFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumGetServiceListFails() {
-    return numGetServiceListFails.value();
-  }
-
-  public void unRegister() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    ms.unregisterSource(SOURCE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
deleted file mode 100644
index 8219212..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
-import org.apache.hadoop.ozone.scm.SCMStorage;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
-
-/**
- * KSMStorage is responsible for management of the StorageDirectories used by
- * the KSM.
- */
-public class KSMStorage extends Storage {
-
-  public static final String STORAGE_DIR = "ksm";
-  public static final String KSM_ID = "ksmUuid";
-
-  /**
-   * Construct KSMStorage.
-   * @throws IOException if any directories are inaccessible.
-   */
-  public KSMStorage(OzoneConfiguration conf) throws IOException {
-    super(NodeType.KSM, OzoneUtils.getOzoneMetaDirPath(conf), STORAGE_DIR);
-  }
-
-  public void setScmId(String scmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("KSM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(SCMStorage.SCM_ID, scmId);
-    }
-  }
-
-  public void setKsmId(String ksmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("KSM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(KSM_ID, ksmId);
-    }
-  }
-
-  /**
-   * Retrieves the SCM ID from the version file.
-   * @return SCM_ID
-   */
-  public String getScmId() {
-    return getStorageInfo().getProperty(SCMStorage.SCM_ID);
-  }
-
-  /**
-   * Retrieves the KSM ID from the version file.
-   * @return KSM_ID
-   */
-  public String getKsmId() {
-    return getStorageInfo().getProperty(KSM_ID);
-  }
-
-  @Override
-  protected Properties getNodeProperties() {
-    String ksmId = getKsmId();
-    if (ksmId == null) {
-      ksmId = UUID.randomUUID().toString();
-    }
-    Properties ksmProperties = new Properties();
-    ksmProperties.setProperty(KSM_ID, ksmId);
-    return ksmProperties;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
deleted file mode 100644
index 3beaed4..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
-import static 
org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
-
-/**
- * This is the background service to delete keys.
- * Scan the metadata of ksm periodically to get
- * the keys with prefix "#deleting" and ask scm to
- * delete metadata accordingly, if scm returns
- * success for keys, then clean up those keys.
- */
-public class KeyDeletingService extends BackgroundService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(KeyDeletingService.class);
-
-  // The thread pool size for key deleting service.
-  private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
-
-  private final ScmBlockLocationProtocol scmClient;
-  private final KeyManager manager;
-  private final int keyLimitPerTask;
-
-  public KeyDeletingService(ScmBlockLocationProtocol scmClient,
-      KeyManager manager, long serviceInterval,
-      long serviceTimeout, Configuration conf) {
-    super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
-        KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
-    this.scmClient = scmClient;
-    this.manager = manager;
-    this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
-        OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new KeyDeletingTask());
-    return queue;
-  }
-
-  /**
-   * A key deleting task scans KSM DB and looking for a certain number
-   * of pending-deletion keys, sends these keys along with their associated
-   * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
-   * SCM persisted the blocks info in its deletedBlockLog), it removes
-   * these keys from the DB.
-   */
-  private class KeyDeletingTask implements
-      BackgroundTask<BackgroundTaskResult> {
-
-    @Override
-    public int getPriority() {
-      return 0;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      try {
-        long startTime = Time.monotonicNow();
-        List<BlockGroup> keyBlocksList = manager
-            .getPendingDeletionKeys(keyLimitPerTask);
-        if (keyBlocksList.size() > 0) {
-          LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size());
-          List<DeleteBlockGroupResult> results =
-              scmClient.deleteKeyBlocks(keyBlocksList);
-          for (DeleteBlockGroupResult result : results) {
-            if (result.isSuccess()) {
-              try {
-                // Purge key from KSM DB.
-                manager.deletePendingDeletionKey(result.getObjectKey());
-                LOG.debug("Key {} deleted from KSM DB", result.getObjectKey());
-              } catch (IOException e) {
-                // if a pending deletion key is failed to delete,
-                // print a warning here and retain it in this state,
-                // so that it can be attempt to delete next time.
-                LOG.warn("Failed to delete pending-deletion key {}",
-                    result.getObjectKey(), e);
-              }
-            } else {
-              // Key deletion failed, retry in next interval.
-              LOG.warn("Key {} deletion failed because some of the blocks"
-                  + " were failed to delete, failed blocks: {}",
-                  result.getObjectKey(),
-                  String.join(",", result.getFailedBlocks()));
-            }
-          }
-
-          if (!results.isEmpty()) {
-            LOG.info("Number of key deleted from KSM DB: {},"
-                + " task elapsed time: {}ms",
-                results.size(), Time.monotonicNow() - startTime);
-          }
-
-          return results::size;
-        } else {
-          LOG.debug("No pending deletion key found in KSM");
-        }
-      } catch (IOException e) {
-        LOG.error("Unable to get pending deletion keys, retry in"
-            + " next interval", e);
-      }
-      return EmptyTaskResult.newResult();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
deleted file mode 100644
index e71ce5f..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Handles key level commands.
- */
-public interface KeyManager {
-
-  /**
-   * Start key manager.
-   */
-  void start();
-
-  /**
-   * Stop key manager.
-   */
-  void stop() throws IOException;
-
-  /**
-   * After calling commit, the key will be made visible. There can be multiple
-   * open key writes in parallel (identified by client id). The most recently
-   * committed one will be the one visible.
-   *
-   * @param args the key to commit.
-   * @param clientID the client that is committing.
-   * @throws IOException
-   */
-  void commitKey(KsmKeyArgs args, int clientID) throws IOException;
-
-  /**
-   * A client calls this on an open key, to request to allocate a new block,
-   * and appended to the tail of current block list of the open client.
-   *
-   * @param args the key to append
-   * @param clientID the client requesting block.
-   * @return the reference to the new block.
-   * @throws IOException
-   */
-  KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
-      throws IOException;
-  /**
-   * Given the args of a key to put, write an open key entry to meta data.
-   *
-   * In case that the container creation or key write failed on
-   * DistributedStorageHandler, this key's metadata will still stay in KSM.
-   * TODO garbage collect the open keys that never get closed
-   *
-   * @param args the args of the key provided by client.
-   * @return a OpenKeySession instance client uses to talk to container.
-   * @throws Exception
-   */
-  OpenKeySession openKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Look up an existing key. Return the info of the key to client side, which
-   * DistributedStorageHandler will use to access the data on datanode.
-   *
-   * @param args the args of the key provided by client.
-   * @return a KsmKeyInfo instance client uses to talk to container.
-   * @throws IOException
-   */
-  KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Deletes an object by an object key. The key will be immediately removed
-   * from KSM namespace and become invisible to clients. The object data
-   * will be removed in async manner that might retain for some time.
-   *
-   * @param args the args of the key provided by client.
-   * @throws IOException if specified key doesn't exist or
-   * some other I/O errors while deleting an object.
-   */
-  void deleteKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Returns a list of keys represented by {@link KsmKeyInfo}
-   * in the given bucket.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKey
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<KsmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKey, String keyPrefix, int maxKeys)
-      throws IOException;
-
-  /**
-   * Returns a list of pending deletion key info that ups to the given count.
-   * Each entry is a {@link BlockGroup}, which contains the info about the
-   * key name and all its associated block IDs. A pending deletion key is
-   * stored with #deleting# prefix in KSM DB.
-   *
-   * @param count max number of keys to return.
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
-
-  /**
-   * Deletes a pending deletion key by its name. This is often called when
-   * key can be safely deleted from this layer. Once called, all footprints
-   * of the key will be purged from KSM DB.
-   *
-   * @param objectKeyName object key name with #deleting# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deletePendingDeletionKey(String objectKeyName) throws IOException;
-
-  /**
-   * Returns a list of all still open key info. Which contains the info about
-   * the key name and all its associated block IDs. A pending open key has
-   * prefix #open# in KSM DB.
-   *
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getExpiredOpenKeys() throws IOException;
-
-  /**
-   * Deletes a expired open key by its name. Called when a hanging key has been
-   * lingering for too long. Once called, the open key entries gets removed
-   * from KSM mdata data.
-   *
-   * @param objectKeyName object key name with #open# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deleteExpiredOpenKey(String objectKeyName) throws IOException;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to