http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
deleted file mode 100644
index 2ff6be9..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ksm.protocolPB;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.BucketArgs;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.BucketInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateBucketRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateBucketResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoBucketRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoBucketResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetBucketPropertyRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetBucketPropertyResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteBucketRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteBucketResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.LocateKeyRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.LocateKeyResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.KeyArgs;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ListBucketsRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ListBucketsResponse;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.Status;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
-    .ListVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
-    .ListVolumeResponse;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.stream.Collectors;
-
-/**
- *  The client side implementation of KeySpaceManagerProtocol.
- */
-
-@InterfaceAudience.Private
-public final class KeySpaceManagerProtocolClientSideTranslatorPB
-    implements KeySpaceManagerProtocol, ProtocolTranslator, Closeable {
-
-  /**
-   * RpcController is not used and hence is set to null.
-   */
-  private static final RpcController NULL_RPC_CONTROLLER = null;
-
-  private final KeySpaceManagerProtocolPB rpcProxy;
-
-  /**
-   * Constructor for KeySpaceManger Client.
-   * @param rpcProxy
-   */
-  public KeySpaceManagerProtocolClientSideTranslatorPB(
-      KeySpaceManagerProtocolPB rpcProxy) {
-    this.rpcProxy = rpcProxy;
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated
-   * with it. If the stream is already closed then invoking this
-   * method has no effect.
-   * <p>
-   * <p> As noted in {@link AutoCloseable#close()}, cases where the
-   * close may fail require careful attention. It is strongly advised
-   * to relinquish the underlying resources and to internally
-   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
-   * the {@code IOException}.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  /**
-   * Creates a volume.
-   *
-   * @param args - Arguments to create Volume.
-   * @throws IOException
-   */
-  @Override
-  public void createVolume(KsmVolumeArgs args) throws IOException {
-    CreateVolumeRequest.Builder req =
-        CreateVolumeRequest.newBuilder();
-    VolumeInfo volumeInfo = args.getProtobuf();
-    req.setVolumeInfo(volumeInfo);
-
-    final CreateVolumeResponse resp;
-    try {
-      resp = rpcProxy.createVolume(NULL_RPC_CONTROLLER,
-          req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Volume creation failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * Changes the owner of a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param owner - Name of the owner.
-   * @throws IOException
-   */
-  @Override
-  public void setOwner(String volume, String owner) throws IOException {
-    SetVolumePropertyRequest.Builder req =
-        SetVolumePropertyRequest.newBuilder();
-    req.setVolumeName(volume).setOwnerName(owner);
-    final SetVolumePropertyResponse resp;
-    try {
-      resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Volume owner change failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * Changes the Quota on a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param quota - Quota in bytes.
-   * @throws IOException
-   */
-  @Override
-  public void setQuota(String volume, long quota) throws IOException {
-    SetVolumePropertyRequest.Builder req =
-        SetVolumePropertyRequest.newBuilder();
-    req.setVolumeName(volume).setQuotaInBytes(quota);
-    final SetVolumePropertyResponse resp;
-    try {
-      resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Volume quota change failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * Checks if the specified user can access this volume.
-   *
-   * @param volume - volume
-   * @param userAcl - user acls which needs to be checked for access
-   * @return true if the user has required access for the volume,
-   *         false otherwise
-   * @throws IOException
-   */
-  @Override
-  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws
-      IOException {
-    CheckVolumeAccessRequest.Builder req =
-        CheckVolumeAccessRequest.newBuilder();
-    req.setVolumeName(volume).setUserAcl(userAcl);
-    final CheckVolumeAccessResponse resp;
-    try {
-      resp = rpcProxy.checkVolumeAccess(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-    if (resp.getStatus() == Status.ACCESS_DENIED) {
-      return false;
-    } else if (resp.getStatus() == Status.OK) {
-      return true;
-    } else {
-      throw new
-          IOException("Check Volume Access failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * Gets the volume information.
-   *
-   * @param volume - Volume name.
-   * @return KsmVolumeArgs or exception is thrown.
-   * @throws IOException
-   */
-  @Override
-  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
-    InfoVolumeRequest.Builder req = InfoVolumeRequest.newBuilder();
-    req.setVolumeName(volume);
-    final InfoVolumeResponse resp;
-    try {
-      resp = rpcProxy.infoVolume(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Info Volume failed, error:" + resp.getStatus());
-    }
-    return KsmVolumeArgs.getFromProtobuf(resp.getVolumeInfo());
-  }
-
-  /**
-   * Deletes an existing empty volume.
-   *
-   * @param volume - Name of the volume.
-   * @throws IOException
-   */
-  @Override
-  public void deleteVolume(String volume) throws IOException {
-    DeleteVolumeRequest.Builder req = DeleteVolumeRequest.newBuilder();
-    req.setVolumeName(volume);
-    final DeleteVolumeResponse resp;
-    try {
-      resp = rpcProxy.deleteVolume(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Delete Volume failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * Lists volume owned by a specific user.
-   *
-   * @param userName - user name
-   * @param prefix - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the
-   * prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  @Override
-  public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
-                                              String prevKey, int maxKeys)
-      throws IOException {
-    ListVolumeRequest.Builder builder = ListVolumeRequest.newBuilder();
-    if (!Strings.isNullOrEmpty(prefix)) {
-      builder.setPrefix(prefix);
-    }
-    if (!Strings.isNullOrEmpty(prevKey)) {
-      builder.setPrevKey(prevKey);
-    }
-    builder.setMaxKeys(maxKeys);
-    builder.setUserName(userName);
-    builder.setScope(ListVolumeRequest.Scope.VOLUMES_BY_USER);
-    return listVolume(builder.build());
-  }
-
-  /**
-   * Lists volume all volumes in the cluster.
-   *
-   * @param prefix - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the
-   * prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  @Override
-  public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey,
-      int maxKeys) throws IOException {
-    ListVolumeRequest.Builder builder = ListVolumeRequest.newBuilder();
-    if (!Strings.isNullOrEmpty(prefix)) {
-      builder.setPrefix(prefix);
-    }
-    if (!Strings.isNullOrEmpty(prevKey)) {
-      builder.setPrevKey(prevKey);
-    }
-    builder.setMaxKeys(maxKeys);
-    builder.setScope(ListVolumeRequest.Scope.VOLUMES_BY_CLUSTER);
-    return listVolume(builder.build());
-  }
-
-  private List<KsmVolumeArgs> listVolume(ListVolumeRequest request)
-      throws IOException {
-    final ListVolumeResponse resp;
-    try {
-      resp = rpcProxy.listVolumes(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("List volume failed, error: "
-          + resp.getStatus());
-    }
-
-    List<KsmVolumeArgs> result = Lists.newArrayList();
-    for (VolumeInfo volInfo : resp.getVolumeInfoList()) {
-      KsmVolumeArgs volArgs = KsmVolumeArgs.getFromProtobuf(volInfo);
-      result.add(volArgs);
-    }
-
-    return resp.getVolumeInfoList().stream()
-        .map(item -> KsmVolumeArgs.getFromProtobuf(item))
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Creates a bucket.
-   *
-   * @param bucketInfo - BucketInfo to create bucket.
-   * @throws IOException
-   */
-  @Override
-  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
-    CreateBucketRequest.Builder req =
-        CreateBucketRequest.newBuilder();
-    BucketInfo bucketInfoProtobuf = bucketInfo.getProtobuf();
-    req.setBucketInfo(bucketInfoProtobuf);
-
-    final CreateBucketResponse resp;
-    try {
-      resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER,
-          req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("Bucket creation failed, error: "
-          + resp.getStatus());
-    }
-  }
-
-  /**
-   * Gets the bucket information.
-   *
-   * @param volume - Volume name.
-   * @param bucket - Bucket name.
-   * @return KsmBucketInfo or exception is thrown.
-   * @throws IOException
-   */
-  @Override
-  public KsmBucketInfo getBucketInfo(String volume, String bucket)
-      throws IOException {
-    InfoBucketRequest.Builder req =
-        InfoBucketRequest.newBuilder();
-    req.setVolumeName(volume);
-    req.setBucketName(bucket);
-
-    final InfoBucketResponse resp;
-    try {
-      resp = rpcProxy.infoBucket(NULL_RPC_CONTROLLER,
-          req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() == Status.OK) {
-      return KsmBucketInfo.getFromProtobuf(resp.getBucketInfo());
-    } else {
-      throw new IOException("Info Bucket failed, error: "
-          + resp.getStatus());
-    }
-  }
-
-  /**
-   * Sets bucket property from args.
-   * @param args - BucketArgs.
-   * @throws IOException
-   */
-  @Override
-  public void setBucketProperty(KsmBucketArgs args)
-      throws IOException {
-    SetBucketPropertyRequest.Builder req =
-        SetBucketPropertyRequest.newBuilder();
-    BucketArgs bucketArgs = args.getProtobuf();
-    req.setBucketArgs(bucketArgs);
-    final SetBucketPropertyResponse resp;
-    try {
-      resp = rpcProxy.setBucketProperty(NULL_RPC_CONTROLLER,
-          req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("Setting bucket property failed, error: "
-          + resp.getStatus());
-    }
-  }
-
-  /**
-   * List buckets in a volume.
-   *
-   * @param volumeName
-   * @param startKey
-   * @param prefix
-   * @param count
-   * @return
-   * @throws IOException
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(String volumeName,
-      String startKey, String prefix, int count) throws IOException {
-    List<KsmBucketInfo> buckets = new ArrayList<>();
-    ListBucketsRequest.Builder reqBuilder = ListBucketsRequest.newBuilder();
-    reqBuilder.setVolumeName(volumeName);
-    reqBuilder.setCount(count);
-    if (startKey != null) {
-      reqBuilder.setStartKey(startKey);
-    }
-    if (prefix != null) {
-      reqBuilder.setPrefix(prefix);
-    }
-    ListBucketsRequest request = reqBuilder.build();
-    final ListBucketsResponse resp;
-    try {
-      resp = rpcProxy.listBuckets(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-    if (resp.getStatus() == Status.OK) {
-      buckets.addAll(
-          resp.getBucketInfoList().stream()
-              .map(KsmBucketInfo::getFromProtobuf)
-              .collect(Collectors.toList()));
-      return buckets;
-    } else {
-      throw new IOException("List Buckets failed, error: "
-          + resp.getStatus());
-    }
-  }
-
-  /**
-   * Allocate a block for a key, then use the returned meta info to talk to 
data
-   * node to actually write the key.
-   * @param args the args for the key to be allocated
-   * @return a handler to the key, returned client
-   * @throws IOException
-   */
-  @Override
-  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
-    LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
-    KeyArgs keyArgs = KeyArgs.newBuilder()
-        .setVolumeName(args.getVolumeName())
-        .setBucketName(args.getBucketName())
-        .setKeyName(args.getKeyName())
-        .setDataSize(args.getDataSize()).build();
-    req.setKeyArgs(keyArgs);
-
-    final LocateKeyResponse resp;
-    try {
-      resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("Create key failed, error:" +
-          resp.getStatus());
-    }
-    return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
-  }
-
-  @Override
-  public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
-    LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
-    KeyArgs keyArgs = KeyArgs.newBuilder()
-        .setVolumeName(args.getVolumeName())
-        .setBucketName(args.getBucketName())
-        .setKeyName(args.getKeyName())
-        .setDataSize(args.getDataSize()).build();
-    req.setKeyArgs(keyArgs);
-
-    final LocateKeyResponse resp;
-    try {
-      resp = rpcProxy.lookupKey(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("Lookup key failed, error:" +
-          resp.getStatus());
-    }
-    return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
-  }
-
-  /**
-   * Deletes an existing key.
-   *
-   * @param args the args of the key.
-   * @throws IOException
-   */
-  @Override
-  public void deleteKey(KsmKeyArgs args) throws IOException {
-    LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
-    KeyArgs keyArgs = KeyArgs.newBuilder()
-        .setVolumeName(args.getVolumeName())
-        .setBucketName(args.getBucketName())
-        .setKeyName(args.getKeyName()).build();
-    req.setKeyArgs(keyArgs);
-
-    final LocateKeyResponse resp;
-    try {
-      resp = rpcProxy.deleteKey(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new IOException("Delete key failed, error:" +
-          resp.getStatus());
-    }
-  }
-
-  /**
-   * Deletes an existing empty bucket from volume.
-   * @param volume - Name of the volume.
-   * @param bucket - Name of the bucket.
-   * @throws IOException
-   */
-  public void deleteBucket(String volume, String bucket) throws IOException {
-    DeleteBucketRequest.Builder req = DeleteBucketRequest.newBuilder();
-    req.setVolumeName(volume);
-    req.setBucketName(bucket);
-    final DeleteBucketResponse resp;
-    try {
-      resp = rpcProxy.deleteBucket(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (resp.getStatus() != Status.OK) {
-      throw new
-          IOException("Delete Bucket failed, error:" + resp.getStatus());
-    }
-  }
-
-  /**
-   * List keys in a bucket.
-   */
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String prefix, int maxKeys) throws IOException {
-    List<KsmKeyInfo> keys = new ArrayList<>();
-    ListKeysRequest.Builder reqBuilder = ListKeysRequest.newBuilder();
-    reqBuilder.setVolumeName(volumeName);
-    reqBuilder.setBucketName(bucketName);
-    reqBuilder.setCount(maxKeys);
-
-    if (startKey != null) {
-      reqBuilder.setStartKey(startKey);
-    }
-
-    if (prefix != null) {
-      reqBuilder.setPrefix(prefix);
-    }
-
-    ListKeysRequest request = reqBuilder.build();
-    final ListKeysResponse resp;
-    try {
-      resp = rpcProxy.listKeys(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-    if (resp.getStatus() == Status.OK) {
-      keys.addAll(
-          resp.getKeyInfoList().stream()
-              .map(KsmKeyInfo::getFromProtobuf)
-              .collect(Collectors.toList()));
-      return keys;
-    } else {
-      throw new IOException("List Keys failed, error: "
-          + resp.getStatus());
-    }
-  }
-
-  /**
-   * Return the proxy object underlying this protocol translator.
-   *
-   * @return the proxy object underlying this protocol translator.
-   */
-  @Override
-  public Object getUnderlyingProxyObject() {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
deleted file mode 100644
index 8b960a9..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ksm.protocolPB;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.KeySpaceManagerService;
-
-/**
- * Protocol used to communicate with KSM.
- */
-@ProtocolInfo(protocolName =
-    "org.apache.hadoop.ozone.protocol.KeySpaceManagerProtocol",
-    protocolVersion = 1)
-@InterfaceAudience.Private
-public interface KeySpaceManagerProtocolPB
-    extends KeySpaceManagerService.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/package-info.java
deleted file mode 100644
index 5dfa5ea..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ksm.protocolPB;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
index 0aca0ad..d712074 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
@@ -84,6 +84,11 @@ public class OzoneAcl {
     return new OzoneAcl(aclType, parts[1], rights);
   }
 
+  @Override
+  public String toString() {
+    return type+":" + name + ":" + rights;
+  }
+
   /**
    * Returns a hash code value for the object. This method is
    * supported for the benefit of hash tables.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
new file mode 100644
index 0000000..55b5f88
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.scm.ScmConfigKeys;
+
+/**
+ * This class contains constants for configuration keys used in Ozone.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class OzoneConfigKeys {
+  public static final String DFS_CONTAINER_IPC_PORT =
+      "dfs.container.ipc";
+  public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+
+  /**
+   *
+   * When set to true, allocate a random free port for ozone container,
+   * so that a mini cluster is able to launch multiple containers on a node.
+   *
+   * When set to false (default), container port is fixed as specified by
+   * DFS_CONTAINER_IPC_PORT_DEFAULT.
+   */
+  public static final String DFS_CONTAINER_IPC_RANDOM_PORT =
+      "dfs.container.ipc.random.port";
+  public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
+      false;
+
+  public static final String OZONE_LOCALSTORAGE_ROOT =
+      "ozone.localstorage.root";
+  public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";
+  public static final String OZONE_ENABLED =
+      "ozone.enabled";
+  public static final boolean OZONE_ENABLED_DEFAULT = false;
+  public static final String OZONE_HANDLER_TYPE_KEY =
+      "ozone.handler.type";
+  public static final String OZONE_HANDLER_TYPE_DEFAULT = "distributed";
+  public static final String OZONE_TRACE_ENABLED_KEY =
+      "ozone.trace.enabled";
+  public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false;
+
+  public static final String OZONE_CONTAINER_METADATA_DIRS =
+      "ozone.container.metadata.dirs";
+
+  public static final String OZONE_METADATA_STORE_IMPL =
+      "ozone.metastore.impl";
+  public static final String OZONE_METADATA_STORE_IMPL_LEVELDB =
+      "LevelDB";
+  public static final String OZONE_METADATA_STORE_IMPL_ROCKSDB =
+      "RocksDB";
+  public static final String OZONE_METADATA_STORE_IMPL_DEFAULT =
+      OZONE_METADATA_STORE_IMPL_LEVELDB;
+
+  public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
+  public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
+
+  public static final String OZONE_SCM_BLOCK_SIZE_KEY =
+      "ozone.scm.block.size";
+  public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256 * OzoneConsts.MB;
+
+  /**
+   * Ozone administrator users delimited by comma.
+   * If not set, only the user who launches an ozone service will be the
+   * admin user. This property must be set if ozone services are started by
+   * different users. Otherwise the RPC layer will reject calls from
+   * other servers which are started by users not in the list.
+   * */
+  public static final String OZONE_ADMINISTRATORS =
+      "ozone.administrators";
+
+  public static final String OZONE_CLIENT_SOCKET_TIMEOUT_MS =
+      "ozone.client.socket.timeout.ms";
+  public static final int OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT = 5000;
+  public static final String OZONE_CLIENT_CONNECTION_TIMEOUT_MS =
+      "ozone.client.connection.timeout.ms";
+  public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
+
+  public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+  public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
+  /** A unique ID to identify a Ratis server. */
+  public static final String DFS_CONTAINER_RATIS_SERVER_ID =
+      "dfs.container.ratis.server.id";
+  public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
+      "dfs.container.ratis.datanode.storage.dir";
+
+  public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
+      "ozone.web.authentication.kerberos.principal";
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private OzoneConfigKeys() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfiguration.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfiguration.java
new file mode 100644
index 0000000..a16e8d9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfiguration.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Configuration for ozone.
+ */
+@InterfaceAudience.Private
+public class OzoneConfiguration extends Configuration {
+  static {
+    // adds the default resources
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+    Configuration.addDefaultResource("ozone-default.xml");
+    Configuration.addDefaultResource("ozone-site.xml");
+  }
+
+  public OzoneConfiguration() {
+  }
+
+  public OzoneConfiguration(Configuration conf) {
+    super(conf);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
new file mode 100644
index 0000000..bfd5714
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+
+import java.util.List;
+
+/**
+ * A class that encapsulates OzoneBucket.
+ */
+public class OzoneBucket {
+
+  /**
+   * Name of the volume in which the bucket belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the bucket.
+   */
+  private final String bucketName;
+  /**
+   * Bucket ACLs.
+   */
+  private final List<OzoneAcl> acls;
+
+  /**
+   * Type of storage to be used for this bucket.
+   * [RAM_DISK, SSD, DISK, ARCHIVE]
+   */
+  private final StorageType storageType;
+
+  /**
+   * Bucket Version flag.
+   */
+  private final Versioning versioning;
+
+
+  /**
+   * Constructs OzoneBucket from KsmBucketInfo.
+   *
+   * @param ksmBucketInfo
+   */
+  public OzoneBucket(KsmBucketInfo ksmBucketInfo) {
+    this.volumeName = ksmBucketInfo.getVolumeName();
+    this.bucketName = ksmBucketInfo.getBucketName();
+    this.acls = ksmBucketInfo.getAcls();
+    this.storageType = ksmBucketInfo.getStorageType();
+    this.versioning = ksmBucketInfo.getIsVersionEnabled() ?
+        Versioning.ENABLED : Versioning.DISABLED;
+  }
+
+  /**
+   * Returns Volume Name.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Bucket Name.
+   *
+   * @return bucketName
+   */
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  /**
+   * Returns ACL's associated with the Bucket.
+   *
+   * @return acls
+   */
+  public List<OzoneAcl> getAcls() {
+    return acls;
+  }
+
+  /**
+   * Returns StorageType of the Bucket.
+   *
+   * @return storageType
+   */
+  public StorageType getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Returns Versioning associated with the Bucket.
+   *
+   * @return versioning
+   */
+  public Versioning getVersioning() {
+    return versioning;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClient.java
new file mode 100644
index 0000000..a7808d8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClient.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * OzoneClient can connect to a Ozone Cluster and
+ * perform basic operations.
+ */
+public interface OzoneClient {
+
+  /**
+   * Creates a new Volume.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param acls ACLs to be added to the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    OzoneAcl... acls)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param quota Volume Quota
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    long quota)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param quota Volume Quota
+   * @param acls ACLs to be added to the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    long quota, OzoneAcl... acls)
+      throws IOException;
+
+  /**
+   * Sets the owner of the volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner to be set for the Volume
+   *
+   * @throws IOException
+   */
+  void setVolumeOwner(String volumeName, String owner) throws IOException;
+
+  /**
+   * Set Volume Quota.
+   *
+   * @param volumeName Name of the Volume
+   * @param quota Quota to be set for the Volume
+   *
+   * @throws IOException
+   */
+  void setVolumeQuota(String volumeName, long quota)
+      throws IOException;
+
+  /**
+   * Returns {@link OzoneVolume}.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @return KsmVolumeArgs
+   *
+   * @throws OzoneVolume
+   * */
+  OzoneVolume getVolumeDetails(String volumeName)
+      throws IOException;
+
+  /**
+   * Checks if a Volume exists and the user with a role specified has access
+   * to the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param acl requested acls which needs to be checked for access
+   *
+   * @return Boolean - True if the user with a role can access the volume.
+   * This is possible for owners of the volume and admin users
+   *
+   * @throws IOException
+   */
+  boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
+      throws IOException;
+
+  /**
+   * Deletes an Empty Volume.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void deleteVolume(String volumeName) throws IOException;
+
+  /**
+   * Returns the List of Volumes owned by current user.
+   *
+   * @param volumePrefix Volume prefix to match
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneVolume> listVolumes(String volumePrefix)
+      throws IOException;
+
+  /**
+   * Returns the List of Volumes owned by the specific user.
+   *
+   * @param volumePrefix Volume prefix to match
+   * @param user User Name
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneVolume> listVolumes(String volumePrefix, String user)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with versioning set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param versioning Bucket versioning
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                    Versioning versioning)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with storage type set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param storageType StorageType for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                    StorageType storageType)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with ACLs set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param acls OzoneAcls for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                           OzoneAcl... acls)
+      throws IOException;
+
+
+  /**
+   * Creates a new Bucket in the Volume, with versioning
+   * storage type and ACLs set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param storageType StorageType for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                           Versioning versioning,
+                           StorageType storageType, OzoneAcl... acls)
+      throws IOException;
+
+  /**
+   * Adds or Removes ACLs from a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void addBucketAcls(String volumeName, String bucketName,
+                     List<OzoneAcl> addAcls)
+      throws IOException;
+
+  /**
+   * Adds or Removes ACLs from a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void removeBucketAcls(String volumeName, String bucketName,
+                        List<OzoneAcl> removeAcls)
+      throws IOException;
+
+
+  /**
+   * Enables or disables Bucket Versioning.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void setBucketVersioning(String volumeName, String bucketName,
+                           Versioning versioning)
+      throws IOException;
+
+  /**
+   * Sets the Storage Class of a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void setBucketStorageType(String volumeName, String bucketName,
+                            StorageType storageType)
+      throws IOException;
+
+  /**
+   * Deletes a bucket if it is empty.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void deleteBucket(String volumeName, String bucketName)
+      throws IOException;
+
+  /**
+   * true if the bucket exists and user has read access
+   * to the bucket else throws Exception.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void checkBucketAccess(String volumeName, String bucketName)
+      throws IOException;
+
+    /**
+     * Returns {@link OzoneBucket}.
+     *
+     * @param volumeName Name of the Volume
+     * @param bucketName Name of the Bucket
+     *
+     * @return OzoneBucket
+     *
+     * @throws IOException
+     */
+  OzoneBucket getBucketDetails(String volumeName, String bucketName)
+        throws IOException;
+
+  /**
+   * Returns the List of Buckets in the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketPrefix Bucket prefix to match
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneBucket> listBuckets(String volumeName, String bucketPrefix)
+      throws IOException;
+
+  /**
+   * Writes a key in an existing bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param size Size of the data
+   *
+   * @return OutputStream
+   *
+   */
+  OzoneOutputStream createKey(String volumeName, String bucketName,
+                              String keyName, long size)
+      throws IOException;
+
+  /**
+   * Reads a key from an existing bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @return LengthInputStream
+   *
+   * @throws IOException
+   */
+  OzoneInputStream getKey(String volumeName, String bucketName, String keyName)
+      throws IOException;
+
+
+  /**
+   * Deletes an existing key.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void deleteKey(String volumeName, String bucketName, String keyName)
+      throws IOException;
+
+
+  /**
+   * Returns list of {@link OzoneKey} in Volume/Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @return OzoneKey
+   *
+   * @throws IOException
+   */
+  List<OzoneKey> listKeys(String volumeName, String bucketName,
+                            String keyPrefix)
+      throws IOException;
+
+
+  /**
+   * Get OzoneKey.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Key name
+   *
+   * @return OzoneKey
+   *
+   * @throws IOException
+   */
+  OzoneKey getKeyDetails(String volumeName, String bucketName,
+                        String keyName)
+      throws IOException;
+
+  /**
+   * Close and release the resources.
+   */
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
new file mode 100644
index 0000000..b098be9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.rest.OzoneRestClient;
+import org.apache.hadoop.ozone.client.rpc.OzoneRpcClient;
+
+import java.io.IOException;
+
+/**
+ * Factory class to create different types of OzoneClients.
+ */
+public final class OzoneClientFactory {
+
+  /**
+   * Private constructor, class is not meant to be initialized.
+   */
+  private OzoneClientFactory(){}
+
+  private static Configuration configuration;
+
+  /**
+   * Returns an OzoneClient which will use RPC protocol to perform
+   * client operations.
+   *
+   * @return OzoneClient
+   * @throws IOException
+   */
+  public static OzoneClient getClient() throws IOException {
+    //TODO: get client based on ozone.client.protocol
+    return new OzoneRpcClient(getConfiguration());
+  }
+
+  /**
+   * Returns an OzoneClient which will use RPC protocol to perform
+   * client operations.
+   *
+   * @return OzoneClient
+   * @throws IOException
+   */
+  public static OzoneClient getRpcClient() throws IOException {
+    return new OzoneRpcClient(getConfiguration());
+  }
+
+  /**
+   * Returns an OzoneClient which will use RPC protocol to perform
+   * client operations.
+   *
+   * @return OzoneClient
+   * @throws IOException
+   */
+  public static OzoneClient getRestClient() throws IOException {
+    return new OzoneRestClient(getConfiguration());
+  }
+
+  /**
+   * Sets the configuration, which will be used while creating OzoneClient.
+   *
+   * @param conf
+   */
+  public static void setConfiguration(Configuration conf) {
+    configuration = conf;
+  }
+
+  /**
+   * Returns the configuration if it's already set, else creates a new
+   * {@link OzoneConfiguration} and returns it.
+   *
+   * @return Configuration
+   */
+  private static synchronized Configuration getConfiguration() {
+    if(configuration == null) {
+      setConfiguration(new OzoneConfiguration());
+    }
+    return configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
new file mode 100644
index 0000000..9390a85
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -0,0 +1,707 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import com.google.common.base.Optional;
+
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSI_PORT_DEFAULT;
+
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_BIND_HOST_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL_MS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
+
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
+
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL_MS;
+
+/**
+ * Utility methods for Ozone and Container Clients.
+ *
+ * The methods to retrieve SCM service endpoints assume there is a single
+ * SCM service instance. This will change when we switch to replicated service
+ * instances for redundancy.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class OzoneClientUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      OzoneClientUtils.class);
+  private static final int NO_PORT = -1;
+
+  /**
+   * The service ID of the solitary Ozone SCM service.
+   */
+  public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService";
+  public static final String OZONE_SCM_SERVICE_INSTANCE_ID =
+      "OzoneScmServiceInstance";
+
+  private OzoneClientUtils() {
+    // Never constructed
+  }
+
+  /**
+   * Retrieve the socket addresses of all storage container managers.
+   *
+   * @param conf
+   * @return A collection of SCM addresses
+   * @throws IllegalArgumentException If the configuration is invalid
+   */
+  public static Collection<InetSocketAddress> getSCMAddresses(
+      Configuration conf) throws IllegalArgumentException {
+    Collection<InetSocketAddress> addresses =
+        new HashSet<InetSocketAddress>();
+    Collection<String> names =
+        conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
+    if (names == null || names.isEmpty()) {
+      throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+          + " need to be a set of valid DNS names or IP addresses."
+          + " Null or empty address list found.");
+    }
+
+    final com.google.common.base.Optional<Integer>
+        defaultPort =  com.google.common.base.Optional.of(ScmConfigKeys
+        .OZONE_SCM_DEFAULT_PORT);
+    for (String address : names) {
+      com.google.common.base.Optional<String> hostname =
+          OzoneClientUtils.getHostName(address);
+      if (!hostname.isPresent()) {
+        throw new IllegalArgumentException("Invalid hostname for SCM: "
+            + hostname);
+      }
+      com.google.common.base.Optional<Integer> port =
+          OzoneClientUtils.getHostPort(address);
+      InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
+          port.or(defaultPort.get()));
+      addresses.add(addr);
+    }
+    return addresses;
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM client endpoint.
+   */
+  public static InetSocketAddress getScmAddressForClients(Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    if (!host.isPresent()) {
+      throw new IllegalArgumentException(
+          ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
+          " must be defined. See" +
+          " https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
+          " on configuring Ozone.");
+    }
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(host.get() + ":" +
+        port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM for block service. If
+   * {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined
+   * then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM block client endpoint.
+   * @throws IllegalArgumentException if configuration is not defined.
+   */
+  public static InetSocketAddress getScmAddressForBlockClients(
+      Configuration conf) {
+    Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+    if (!host.isPresent()) {
+      host = getHostNameFromConfigKeys(conf,
+              ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+      if (!host.isPresent()) {
+        throw new IllegalArgumentException(
+                ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY +
+                        " must be defined. See" +
+                        " https://wiki.apache.org/hadoop/Ozone#Configuration"; +
+                        " for details on configuring Ozone.");
+      }
+    }
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(host.get() + ":" +
+        port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by DataNodes to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM service endpoint.
+   */
+  public static InetSocketAddress getScmAddressForDataNodes(
+      Configuration conf) {
+    // We try the following settings in decreasing priority to retrieve the
+    // target host.
+    // - OZONE_SCM_DATANODE_ADDRESS_KEY
+    // - OZONE_SCM_CLIENT_ADDRESS_KEY
+    //
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    if (!host.isPresent()) {
+      throw new IllegalArgumentException(
+          ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
+          " must be defined. See" +
+          " https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
+          " on configuring Ozone.");
+    }
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
+
+    InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" +
+        port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    return addr;
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM client endpoint.
+   */
+  public static InetSocketAddress getScmClientBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY);
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
+            port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM Block service.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM block client endpoint.
+   */
+  public static InetSocketAddress getScmBlockClientBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
+            ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by DataNodes to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM service endpoint.
+   */
+  public static InetSocketAddress getScmDataNodeBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
+            port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+  }
+
+
+  /**
+   * Retrieve the socket address that is used by KSM.
+   * @param conf
+   * @return Target InetSocketAddress for the SCM service endpoint.
+   */
+  public static InetSocketAddress getKsmAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        OZONE_KSM_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        OZONE_KSM_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(OZONE_KSM_BIND_HOST_DEFAULT) + ":" +
+            port.or(OZONE_KSM_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that is used by CBlock Service.
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Service endpoint.
+   */
+  public static InetSocketAddress getCblockServiceRpcAddr(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
+            port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that is used by CBlock Server.
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Server endpoint.
+   */
+  public static InetSocketAddress getCblockServerRpcAddr(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
+            port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the hostname, trying the supplied config keys in order.
+   * Each config value may be absent, or if present in the format
+   * host:port (the :port part is optional).
+   *
+   * @param conf  - Conf
+   * @param keys a list of configuration key names.
+   *
+   * @return first hostname component found from the given keys, or absent.
+   * @throws IllegalArgumentException if any values are not in the 'host'
+   *             or host:port format.
+   */
+  public static Optional<String> getHostNameFromConfigKeys(Configuration conf,
+      String... keys) {
+    for (final String key : keys) {
+      final String value = conf.getTrimmed(key);
+      final Optional<String> hostName = getHostName(value);
+      if (hostName.isPresent()) {
+        return hostName;
+      }
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * Gets the hostname or Indicates that it is absent.
+   * @param value host or host:port
+   * @return hostname
+   */
+  public static Optional<String> getHostName(String value) {
+    if ((value == null) || value.isEmpty()) {
+      return Optional.absent();
+    }
+    return Optional.of(HostAndPort.fromString(value).getHostText());
+  }
+
+  /**
+   * Gets the port if there is one, throws otherwise.
+   * @param value  String in host:port format.
+   * @return Port
+   */
+  public static Optional<Integer> getHostPort(String value) {
+    if((value == null) || value.isEmpty()) {
+      return Optional.absent();
+    }
+    int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
+    if (port == NO_PORT) {
+      return Optional.absent();
+    } else {
+      return Optional.of(port);
+    }
+  }
+
+  /**
+   * Retrieve the port number, trying the supplied config keys in order.
+   * Each config value may be absent, or if present in the format
+   * host:port (the :port part is optional).
+   *
+   * @param conf Conf
+   * @param keys a list of configuration key names.
+   *
+   * @return first port number component found from the given keys, or absent.
+   * @throws IllegalArgumentException if any values are not in the 'host'
+   *             or host:port format.
+   */
+  public static Optional<Integer> getPortNumberFromConfigKeys(
+      Configuration conf, String... keys) {
+    for (final String key : keys) {
+      final String value = conf.getTrimmed(key);
+      final Optional<Integer> hostPort = getHostPort(value);
+      if (hostPort.isPresent()) {
+        return hostPort;
+      }
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * Return the list of service addresses for the Ozone SCM. This method is 
used
+   * by the DataNodes to determine the service instances to connect to.
+   *
+   * @param conf
+   * @return list of SCM service addresses.
+   */
+  public static Map<String, ? extends Map<String, InetSocketAddress>>
+      getScmServiceRpcAddresses(Configuration conf) {
+    final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
+    serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
+        getScmAddressForDataNodes(conf));
+
+    final Map<String, Map<String, InetSocketAddress>> services =
+        new HashMap<>();
+    services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
+    return services;
+  }
+
+  /**
+   * Checks that a given value is with a range.
+   *
+   * For example, sanitizeUserArgs(17, 3, 5, 10)
+   * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10.
+   *
+   * @param valueTocheck  - value to check
+   * @param baseValue     - the base value that is being used.
+   * @param minFactor     - range min - a 2 here makes us ensure that value
+   *                        valueTocheck is at least twice the baseValue.
+   * @param maxFactor     - range max
+   * @return long
+   */
+  private static long sanitizeUserArgs(long valueTocheck, long baseValue,
+      long minFactor, long maxFactor)
+      throws IllegalArgumentException {
+    if ((valueTocheck >= (baseValue * minFactor)) &&
+        (valueTocheck <= (baseValue * maxFactor))) {
+      return valueTocheck;
+    }
+    String errMsg = String.format("%d is not within min = %d or max = " +
+        "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor);
+    throw new IllegalArgumentException(errMsg);
+  }
+
+  /**
+   * Returns the interval in which the heartbeat processor thread runs.
+   *
+   * @param conf - Configuration
+   * @return long in Milliseconds.
+   */
+  public static long getScmheartbeatCheckerInterval(Configuration conf) {
+    return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
+        ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT);
+  }
+
+  /**
+   * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
+   * SCM.
+   *
+   * @param conf - Ozone Config
+   * @return - HB interval in seconds.
+   */
+  public static long getScmHeartbeatInterval(Configuration conf) {
+    return conf.getTimeDuration(
+        OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
+        ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT,
+        TimeUnit.SECONDS);
+  }
+
+  /**
+   * Get the Stale Node interval, which is used by SCM to flag a datanode as
+   * stale, if the heartbeat from that node has been missing for this duration.
+   *
+   * @param conf - Configuration.
+   * @return - Long, Milliseconds to wait before flagging a node as stale.
+   */
+  public static long getStaleNodeInterval(Configuration conf) {
+
+    long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS,
+        OZONE_SCM_STALENODE_INTERVAL_DEFAULT);
+
+    long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
+
+    long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
+
+
+    // Make sure that StaleNodeInterval is configured way above the frequency
+    // at which we run the heartbeat thread.
+    //
+    // Here we check that staleNodeInterval is at least five times more than 
the
+    // frequency at which the accounting thread is going to run.
+    try {
+      sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 
1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex);
+      throw ex;
+    }
+
+    // Make sure that stale node value is greater than configured value that
+    // datanodes are going to send HBs.
+    try {
+      sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex);
+      throw ex;
+    }
+    return staleNodeIntevalMs;
+  }
+
+  /**
+   * Gets the interval for dead node flagging. This has to be a value that is
+   * greater than stale node value,  and by transitive relation we also know
+   * that this value is greater than heartbeat interval and heartbeatProcess
+   * Interval.
+   *
+   * @param conf - Configuration.
+   * @return - the interval for dead node flagging.
+   */
+  public static long getDeadNodeInterval(Configuration conf) {
+    long staleNodeIntervalMs = getStaleNodeInterval(conf);
+    long deadNodeIntervalMs = conf.getLong(
+        OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT);
+
+    try {
+      // Make sure that dead nodes Ms is at least twice the time for staleNodes
+      // with a max of 1000 times the staleNodes.
+      sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Dead Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_STALENODE_INTERVAL_MS, ex);
+      throw ex;
+    }
+    return deadNodeIntervalMs;
+  }
+
+  /**
+   * Returns the maximum number of heartbeat to process per loop of the process
+   * thread.
+   * @param conf Configuration
+   * @return - int -- Number of HBs to process
+   */
+  public static int getMaxHBToProcessPerLoop(Configuration conf) {
+    return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
+        ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
+  }
+
+  /**
+   * Timeout value for the RPC from Datanode to SCM, primarily used for
+   * Heartbeats and container reports.
+   *
+   * @param conf - Ozone Config
+   * @return - Rpc timeout in Milliseconds.
+   */
+  public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
+    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
+        OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Log Warn interval.
+   *
+   * @param conf - Ozone Config
+   * @return - Log warn interval.
+   */
+  public static int getLogWarnInterval(Configuration conf) {
+    return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
+        OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
+  }
+
+  /**
+   * returns the Container port.
+   * @param conf - Conf
+   * @return port number.
+   */
+  public static int getContainerPort(Configuration conf) {
+    return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys
+        .DFS_CONTAINER_IPC_PORT_DEFAULT);
+  }
+
+  /**
+   * After starting an RPC server, updates configuration with the actual
+   * listening address of that server. The listening address may be different
+   * from the configured address if, for example, the configured address uses
+   * port 0 to request use of an ephemeral port.
+   *
+   * @param conf configuration to update
+   * @param rpcAddressKey configuration key for RPC server address
+   * @param addr configured address
+   * @param rpcServer started RPC server.
+   */
+  public static InetSocketAddress updateRPCListenAddress(
+      OzoneConfiguration conf, String rpcAddressKey,
+      InetSocketAddress addr, RPC.Server rpcServer) {
+    return updateListenAddress(conf, rpcAddressKey, addr,
+        rpcServer.getListenerAddress());
+  }
+
+  /**
+   * After starting an server, updates configuration with the actual
+   * listening address of that server. The listening address may be different
+   * from the configured address if, for example, the configured address uses
+   * port 0 to request use of an ephemeral port.
+   *
+   * @param conf       configuration to update
+   * @param addressKey configuration key for RPC server address
+   * @param addr       configured address
+   * @param listenAddr the real listening address.
+   */
+  public static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
+      String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) 
{
+    InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(),
+        listenAddr.getPort());
+    conf.set(addressKey,
+        addr.getHostString() + ":" + listenAddr.getPort());
+    return updatedAddr;
+  }
+
+  /**
+   * Releases a http connection if the request is not null.
+   * @param request
+   */
+  public static void releaseConnection(HttpRequestBase request) {
+    if (request != null) {
+      request.releaseConnection();
+    }
+  }
+
+  /**
+   * @return a default instance of {@link CloseableHttpClient}.
+   */
+  public static CloseableHttpClient newHttpClient() {
+    return OzoneClientUtils.newHttpClient(null);
+  }
+
+  /**
+   * Returns a {@link CloseableHttpClient} configured by given configuration.
+   * If conf is null, returns a default instance.
+   *
+   * @param conf configuration
+   * @return a {@link CloseableHttpClient} instance.
+   */
+  public static CloseableHttpClient newHttpClient(Configuration conf) {
+    int socketTimeout = OzoneConfigKeys
+        .OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT;
+    int connectionTimeout = OzoneConfigKeys
+        .OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT;
+    if (conf != null) {
+      socketTimeout = conf.getInt(
+          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS,
+          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT);
+      connectionTimeout = conf.getInt(
+          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS,
+          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT);
+    }
+
+    CloseableHttpClient client = HttpClients.custom()
+        .setDefaultRequestConfig(
+            RequestConfig.custom()
+                .setSocketTimeout(socketTimeout)
+                .setConnectTimeout(connectionTimeout)
+                .build())
+        .build();
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
new file mode 100644
index 0000000..368736a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+
+import java.util.List;
+
+/**
+ * A class that encapsulates OzoneKey.
+ */
+public class OzoneKey {
+
+  /**
+   * Name of the Volume the Key belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the Bucket the Key belongs to.
+   */
+  private final String bucketName;
+  /**
+   * Name of the Key.
+   */
+  private final String keyName;
+  /**
+   * Size of the data.
+   */
+  private final long dataSize;
+
+  /**
+   * All the locations of this key, in an ordered list.
+   */
+  private final List<KsmKeyLocationInfo> keyLocations;
+  /**
+   * Constructs OzoneKey from KsmKeyInfo.
+   *
+   * @param ksmKeyInfo
+   */
+  public OzoneKey(KsmKeyInfo ksmKeyInfo) {
+    this.volumeName = ksmKeyInfo.getVolumeName();
+    this.bucketName = ksmKeyInfo.getBucketName();
+    this.keyName = ksmKeyInfo.getKeyName();
+    this.dataSize = ksmKeyInfo.getDataSize();
+    this.keyLocations = ksmKeyInfo.getKeyLocationList();
+  }
+
+  /**
+   * Returns Volume Name associated with the Key.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Bucket Name associated with the Key.
+   *
+   * @return bucketName
+   */
+  public String getBucketName(){
+    return bucketName;
+  }
+
+  /**
+   * Returns the Key Name.
+   *
+   * @return keyName
+   */
+  public String getKeyName() {
+    return keyName;
+  }
+
+  /**
+   * Returns the size of the data.
+   *
+   * @return dataSize
+   */
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  /**
+   * Retruns the list of the key locations.
+   *
+   * @return key locations
+   */
+  public List<KsmKeyLocationInfo> getKeyLocations() {
+    return keyLocations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
new file mode 100644
index 0000000..9c2ec3d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.ozone.ksm.helpers.KsmOzoneAclMap;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+
+/**
+ * A class that encapsulates OzoneVolume.
+ */
+public class OzoneVolume {
+
+  /**
+   * Admin Name of the Volume.
+   */
+  private final String adminName;
+  /**
+   * Owner of the Volume.
+   */
+  private final String ownerName;
+  /**
+   * Name of the Volume.
+   */
+  private final String volumeName;
+  /**
+   * Quota allocated for the Volume.
+   */
+  private final long quotaInBytes;
+  /**
+   * Volume ACLs.
+   */
+  private final KsmOzoneAclMap aclMap;
+
+  /**
+   * Constructs OzoneVolume from KsmVolumeArgs.
+   *
+   * @param ksmVolumeArgs
+   */
+  public OzoneVolume(KsmVolumeArgs ksmVolumeArgs) {
+    this.adminName = ksmVolumeArgs.getAdminName();
+    this.ownerName = ksmVolumeArgs.getOwnerName();
+    this.volumeName = ksmVolumeArgs.getVolume();
+    this.quotaInBytes = ksmVolumeArgs.getQuotaInBytes();
+    this.aclMap = ksmVolumeArgs.getAclMap();
+  }
+
+  /**
+   * Returns Volume's admin name.
+   *
+   * @return adminName
+   */
+  public String getAdminName() {
+    return adminName;
+  }
+
+  /**
+   * Returns Volume's owner name.
+   *
+   * @return ownerName
+   */
+  public String getOwnerName() {
+    return ownerName;
+  }
+
+  /**
+   * Returns Volume name.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Quota allocated for the Volume in bytes.
+   *
+   * @return quotaInBytes
+   */
+  public long getQuota() {
+    return quotaInBytes;
+  }
+
+  /**
+   * Returns OzoneAcl list associated with the Volume.
+   *
+   * @return aclMap
+   */
+  public KsmOzoneAclMap getAclMap() {
+    return aclMap;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to