http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java
new file mode 100644
index 0000000..6fff47e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+/**
+ * Test archive creation and unpacking.
+ */
+public class TestArchive {
+  private static final int DIR_COUNT = 10;
+  private static final int SUB_DIR_COUNT = 3;
+  private static final int FILE_COUNT = 10;
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder outputFolder = new TemporaryFolder();
+
+  Checksum crc = new Adler32();
+
+  @Before
+  public void setUp() throws Exception {
+    Random r = new Random();
+    final int megaByte = 1024 * 1024;
+
+    for (int x = 0; x < DIR_COUNT; x++) {
+      File subdir = folder.newFolder(String.format("dir%d", x));
+      for (int y = 0; y < SUB_DIR_COUNT; y++) {
+        File targetDir = new File(subdir.getPath().concat(File.separator)
+            .concat(String.format("subdir%d%d", x, y)));
+        if(!targetDir.mkdirs()) {
+          throw new IOException("Failed to create subdirectory. " +
+              targetDir.toString());
+        }
+        for (int z = 0; z < FILE_COUNT; z++) {
+          Path temp = Paths.get(targetDir.getPath().concat(File.separator)
+              .concat(String.format("File%d.txt", z)));
+          byte[] buf = 
RandomStringUtils.randomAlphanumeric(r.nextInt(megaByte))
+              .getBytes("UTF-8");
+          Files.write(temp, buf);
+          crc.update(buf, 0, buf.length);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testArchive() throws Exception {
+    Checksum readCrc = new Adler32();
+    File archiveFile = new File(outputFolder.getRoot() + File.separator
+        + "test.container.zip");
+    long zipCheckSum = FileUtil.zip(folder.getRoot(), archiveFile);
+    Assert.assertTrue(zipCheckSum > 0);
+    File decomp = new File(outputFolder.getRoot() + File.separator +
+        "decompress");
+    if (!decomp.exists() && !decomp.mkdirs()) {
+      throw new IOException("Unable to create the destination directory. " +
+          decomp.getPath());
+    }
+
+    FileUtil.unZip(archiveFile, decomp);
+    String[] patterns = {"txt"};
+    Iterator<File> iter = FileUtils.iterateFiles(decomp, patterns, true);
+    int count = 0;
+    while (iter.hasNext()) {
+      count++;
+      byte[] buf = Files.readAllBytes(iter.next().toPath());
+      readCrc.update(buf, 0, buf.length);
+    }
+    Assert.assertEquals(DIR_COUNT * SUB_DIR_COUNT * FILE_COUNT, count);
+    Assert.assertEquals(crc.getValue(), readCrc.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java
new file mode 100644
index 0000000..9c480d6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+/**
+ Test cases for SCM client classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
deleted file mode 100644
index 35b8961..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.cblock;
-
-import static java.lang.Thread.NORM_PRIORITY;
-
-/**
- * This class contains constants for configuration keys used in CBlock.
- */
-public final class CBlockConfigKeys {
-  public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
-      "dfs.cblock.servicerpc-address";
-  public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT =
-      9810;
-  public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT =
-      "0.0.0.0";
-
-  public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY =
-      "dfs.cblock.jscsi-address";
-
-  //The port on CBlockManager node for jSCSI to ask
-  public static final String DFS_CBLOCK_JSCSI_PORT_KEY =
-      "dfs.cblock.jscsi.port";
-  public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT =
-      9811;
-
-  public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY =
-      "dfs.cblock.service.rpc-bind-host";
-  public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY =
-      "dfs.cblock.jscsi.rpc-bind-host";
-
-  // default block size is 4KB
-  public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT =
-      4096;
-
-  public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY =
-      "dfs.storage.service.handler.count";
-  public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10;
-
-  public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY =
-      "dfs.cblock.service.leveldb.path";
-  //TODO : find a better place
-  public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT =
-      "/tmp/cblock_levelDB.dat";
-
-
-  public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY =
-      "dfs.cblock.disk.cache.path";
-  public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT =
-      "/tmp/cblockCacheDB";
-  /**
-   * Setting this flag to true makes the block layer compute a sha256 hash of
-   * the data and log that information along with block ID. This is very
-   * useful for doing trace based simulation of various workloads. Since it is
-   * computing a hash for each block this could be expensive, hence default
-   * is false.
-   */
-  public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io";
-  public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false;
-
-  public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO =
-      "dfs.cblock.short.circuit.io";
-  public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT =
-      false;
-
-  /**
-   * Cache size in 1000s of entries. 256 indicates 256 * 1024.
-   */
-  public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB =
-      "dfs.cblock.cache.cache.size.in.kb";
-  public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256;
-
-  /**
-   *  Minimum Number of threads that cache pool will use for background I/O.
-   */
-  public static final String DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE =
-      "dfs.cblock.cache.core.min.pool.size";
-  public static final int DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT = 16;
-
-  /**
-   *  Maximum Number of threads that cache pool will use for background I/O.
-   */
-
-  public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE =
-      "dfs.cblock.cache.max.pool.size";
-  public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256;
-
-  /**
-   * Number of seconds to keep the Thread alive when it is idle.
-   */
-  public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS =
-      "dfs.cblock.cache.keep.alive.seconds";
-  public static final long DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT = 60;
-
-  /**
-   * Priority of cache flusher thread, affecting the relative performance of
-   * write and read.
-   */
-  public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY =
-      "dfs.cblock.cache.thread.priority";
-  public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT =
-      NORM_PRIORITY;
-
-  /**
-   * Block Buffer size in terms of blockID entries, 512 means 512 blockIDs.
-   */
-  public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE =
-      "dfs.cblock.cache.block.buffer.size";
-  public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
-
-  public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS =
-      "dfs.cblock.block.buffer.flush.interval.seconds";
-  public static final int
-      DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60;
-
-  // jscsi server settings
-  public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
-      "dfs.cblock.jscsi.server.address";
-  public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT =
-      "127.0.0.1";
-  public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY =
-      "dfs.cblock.jscsi.cblock.server.address";
-  public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT =
-      "127.0.0.1";
-
-  // to what address cblock server should talk to scm?
-  public static final String DFS_CBLOCK_SCM_IPADDRESS_KEY =
-      "dfs.cblock.scm.ipaddress";
-  public static final String DFS_CBLOCK_SCM_IPADDRESS_DEFAULT =
-      "127.0.0.1";
-  public static final String DFS_CBLOCK_SCM_PORT_KEY =
-      "dfs.cblock.scm.port";
-  public static final int DFS_CBLOCK_SCM_PORT_DEFAULT = 9860;
-
-  public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY =
-      "dfs.cblock.container.size";
-  public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT =
-      5;
-
-  // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB.
-  public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY =
-      "dfs.cblock.cache.leveldb.cache.size.mb";
-  public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
-
-  /**
-   * Cache does an best case attempt to write a block to a container.
-   * At some point of time, we will need to handle the case where we did try
-   * 64K times and is till not able to write to the container.
-   *
-   * TODO: We will need cBlock Server to allow us to do a remapping of the
-   * block location in case of failures, at that point we should reduce the
-   * retry count to a more normal number. This is approximately 18 hours of
-   * retry.
-   */
-  public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY =
-      "dfs.cblock.cache.max.retry";
-  public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
-      64 * 1024;
-
-  private CBlockConfigKeys() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
index 144c3bb..d7349cf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
@@ -34,7 +34,7 @@ import 
org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
 import org.apache.hadoop.cblock.protocolPB
     .CBlockServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.scm.XceiverClientManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
index f70e8a4..90a16ce 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
@@ -22,7 +22,7 @@ import 
org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index 5b76179..e6dff98 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -31,12 +31,13 @@ import com.sun.jersey.api.container.ContainerFactory;
 import com.sun.jersey.api.core.ApplicationAdapter;
 
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ksm.protocolPB
+import org.apache.hadoop.ozone.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
-import 
org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .ScmBlockLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
-import 
org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
index bfdecce..5114298 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
@@ -25,7 +25,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpRequest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
-import org.apache.hadoop.ozone.web.headers.Header;
+import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
 import 
org.apache.hadoop.ozone.web.netty.RequestDispatchObjectStoreChannelHandler;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
index 725b20f..d2aaf73 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java
deleted file mode 100644
index 51eefdc..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone;
-
-
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-
-import java.util.List;
-
-/**
- * A class that encapsulates OzoneBucket.
- */
-public class OzoneBucket {
-
-  /**
-   * Name of the volume in which the bucket belongs to.
-   */
-  private final String volumeName;
-  /**
-   * Name of the bucket.
-   */
-  private final String bucketName;
-  /**
-   * Bucket ACLs.
-   */
-  private final List<OzoneAcl> acls;
-
-  /**
-   * Type of storage to be used for this bucket.
-   * [RAM_DISK, SSD, DISK, ARCHIVE]
-   */
-  private final StorageType storageType;
-
-  /**
-   * Bucket Version flag.
-   */
-  private final Versioning versioning;
-
-
-  /**
-   * Constructs OzoneBucket from KsmBucketInfo.
-   *
-   * @param ksmBucketInfo
-   */
-  public OzoneBucket(KsmBucketInfo ksmBucketInfo) {
-    this.volumeName = ksmBucketInfo.getVolumeName();
-    this.bucketName = ksmBucketInfo.getBucketName();
-    this.acls = ksmBucketInfo.getAcls();
-    this.storageType = ksmBucketInfo.getStorageType();
-    this.versioning = ksmBucketInfo.getIsVersionEnabled() ?
-        Versioning.ENABLED : Versioning.DISABLED;
-  }
-
-  /**
-   * Returns Volume Name.
-   *
-   * @return volumeName
-   */
-  public String getVolumeName() {
-    return volumeName;
-  }
-
-  /**
-   * Returns Bucket Name.
-   *
-   * @return bucketName
-   */
-  public String getBucketName() {
-    return bucketName;
-  }
-
-  /**
-   * Returns ACL's associated with the Bucket.
-   *
-   * @return acls
-   */
-  public List<OzoneAcl> getAcls() {
-    return acls;
-  }
-
-  /**
-   * Returns StorageType of the Bucket.
-   *
-   * @return storageType
-   */
-  public StorageType getStorageType() {
-    return storageType;
-  }
-
-  /**
-   * Returns Versioning associated with the Bucket.
-   *
-   * @return versioning
-   */
-  public Versioning getVersioning() {
-    return versioning;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java
deleted file mode 100644
index dd52a57..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-import org.apache.hadoop.ozone.io.OzoneInputStream;
-import org.apache.hadoop.ozone.io.OzoneOutputStream;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * OzoneClient can connect to a Ozone Cluster and
- * perform basic operations.
- */
-public interface OzoneClient {
-
-  /**
-   * Creates a new Volume.
-   *
-   * @param volumeName Name of the Volume
-   *
-   * @throws IOException
-   */
-  void createVolume(String volumeName)
-      throws IOException;
-
-  /**
-   * Creates a new Volume, with owner set.
-   *
-   * @param volumeName Name of the Volume
-   * @param owner Owner to be set for Volume
-   *
-   * @throws IOException
-   */
-  void createVolume(String volumeName, String owner)
-      throws IOException;
-
-  /**
-   * Creates a new Volume, with owner and quota set.
-   *
-   * @param volumeName Name of the Volume
-   * @param owner Owner to be set for Volume
-   * @param acls ACLs to be added to the Volume
-   *
-   * @throws IOException
-   */
-  void createVolume(String volumeName, String owner,
-                    OzoneAcl... acls)
-      throws IOException;
-
-  /**
-   * Creates a new Volume, with owner and quota set.
-   *
-   * @param volumeName Name of the Volume
-   * @param owner Owner to be set for Volume
-   * @param quota Volume Quota
-   *
-   * @throws IOException
-   */
-  void createVolume(String volumeName, String owner,
-                    long quota)
-      throws IOException;
-
-  /**
-   * Creates a new Volume, with owner and quota set.
-   *
-   * @param volumeName Name of the Volume
-   * @param owner Owner to be set for Volume
-   * @param quota Volume Quota
-   * @param acls ACLs to be added to the Volume
-   *
-   * @throws IOException
-   */
-  void createVolume(String volumeName, String owner,
-                    long quota, OzoneAcl... acls)
-      throws IOException;
-
-  /**
-   * Sets the owner of the volume.
-   *
-   * @param volumeName Name of the Volume
-   * @param owner to be set for the Volume
-   *
-   * @throws IOException
-   */
-  void setVolumeOwner(String volumeName, String owner) throws IOException;
-
-  /**
-   * Set Volume Quota.
-   *
-   * @param volumeName Name of the Volume
-   * @param quota Quota to be set for the Volume
-   *
-   * @throws IOException
-   */
-  void setVolumeQuota(String volumeName, long quota)
-      throws IOException;
-
-  /**
-   * Returns {@link OzoneVolume}.
-   *
-   * @param volumeName Name of the Volume
-   *
-   * @return KsmVolumeArgs
-   *
-   * @throws OzoneVolume
-   * */
-  OzoneVolume getVolumeDetails(String volumeName)
-      throws IOException;
-
-  /**
-   * Checks if a Volume exists and the user with a role specified has access
-   * to the Volume.
-   *
-   * @param volumeName Name of the Volume
-   * @param acl requested acls which needs to be checked for access
-   *
-   * @return Boolean - True if the user with a role can access the volume.
-   * This is possible for owners of the volume and admin users
-   *
-   * @throws IOException
-   */
-  boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
-      throws IOException;
-
-  /**
-   * Deletes an Empty Volume.
-   *
-   * @param volumeName Name of the Volume
-   *
-   * @throws IOException
-   */
-  void deleteVolume(String volumeName) throws IOException;
-
-  /**
-   * Returns the List of Volumes owned by current user.
-   *
-   * @param volumePrefix Volume prefix to match
-   *
-   * @return KsmVolumeArgs Iterator
-   *
-   * @throws IOException
-   */
-  Iterator<OzoneVolume> listVolumes(String volumePrefix)
-      throws IOException;
-
-  /**
-   * Returns the List of Volumes owned by the specific user.
-   *
-   * @param volumePrefix Volume prefix to match
-   * @param user User Name
-   *
-   * @return KsmVolumeArgs Iterator
-   *
-   * @throws IOException
-   */
-  Iterator<OzoneVolume> listVolumes(String volumePrefix, String user)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume, with versioning set.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param versioning Bucket versioning
-   *
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName,
-                    Versioning versioning)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume, with storage type set.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param storageType StorageType for the Bucket
-   *
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName,
-                    StorageType storageType)
-      throws IOException;
-
-  /**
-   * Creates a new Bucket in the Volume, with ACLs set.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param acls OzoneAcls for the Bucket
-   *
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName,
-                           OzoneAcl... acls)
-      throws IOException;
-
-
-  /**
-   * Creates a new Bucket in the Volume, with versioning
-   * storage type and ACLs set.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param storageType StorageType for the Bucket
-   *
-   * @throws IOException
-   */
-  void createBucket(String volumeName, String bucketName,
-                           OzoneConsts.Versioning versioning,
-                           StorageType storageType, OzoneAcl... acls)
-      throws IOException;
-
-  /**
-   * Adds or Removes ACLs from a Bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void addBucketAcls(String volumeName, String bucketName,
-                     List<OzoneAcl> addAcls)
-      throws IOException;
-
-  /**
-   * Adds or Removes ACLs from a Bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void removeBucketAcls(String volumeName, String bucketName,
-                        List<OzoneAcl> removeAcls)
-      throws IOException;
-
-
-  /**
-   * Enables or disables Bucket Versioning.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void setBucketVersioning(String volumeName, String bucketName,
-                           Versioning versioning)
-      throws IOException;
-
-  /**
-   * Sets the Storage Class of a Bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void setBucketStorageType(String volumeName, String bucketName,
-                            StorageType storageType)
-      throws IOException;
-
-  /**
-   * Deletes a bucket if it is empty.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @throws IOException
-   */
-  void deleteBucket(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * true if the bucket exists and user has read access
-   * to the bucket else throws Exception.
-   *
-   * @param volumeName Name of the Volume
-   *
-   * @throws IOException
-   */
-  void checkBucketAccess(String volumeName, String bucketName)
-      throws IOException;
-
-    /**
-     * Returns {@link OzoneBucket}.
-     *
-     * @param volumeName Name of the Volume
-     * @param bucketName Name of the Bucket
-     *
-     * @return OzoneBucket
-     *
-     * @throws IOException
-     */
-  OzoneBucket getBucketDetails(String volumeName, String bucketName)
-        throws IOException;
-
-  /**
-   * Returns the List of Buckets in the Volume.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketPrefix Bucket prefix to match
-   *
-   * @return KsmVolumeArgs Iterator
-   *
-   * @throws IOException
-   */
-  Iterator<OzoneBucket> listBuckets(String volumeName, String bucketPrefix)
-      throws IOException;
-
-  /**
-   * Writes a key in an existing bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param size Size of the data
-   *
-   * @return OutputStream
-   *
-   */
-  OzoneOutputStream createKey(String volumeName, String bucketName,
-                              String keyName, long size)
-      throws IOException;
-
-  /**
-   * Reads a key from an existing bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @return LengthInputStream
-   *
-   * @throws IOException
-   */
-  OzoneInputStream getKey(String volumeName, String bucketName, String keyName)
-      throws IOException;
-
-
-  /**
-   * Deletes an existing key.
-   *
-   * @param volumeName Name of the Volume
-   *
-   * @throws IOException
-   */
-  void deleteKey(String volumeName, String bucketName, String keyName)
-      throws IOException;
-
-
-  /**
-   * Returns list of {@link OzoneKey} in Volume/Bucket.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   *
-   * @return OzoneKey
-   *
-   * @throws IOException
-   */
-  List<OzoneKey> listKeys(String volumeName, String bucketName,
-                            String keyPrefix)
-      throws IOException;
-
-
-  /**
-   * Get OzoneKey.
-   *
-   * @param volumeName Name of the Volume
-   * @param bucketName Name of the Bucket
-   * @param keyName Key name
-   *
-   * @return OzoneKey
-   *
-   * @throws IOException
-   */
-  OzoneKey getKeyDetails(String volumeName, String bucketName,
-                        String keyName)
-      throws IOException;
-
-  /**
-   * Close and release the resources.
-   */
-  void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java
deleted file mode 100644
index 7866f58..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-
-/**
- * Factory class to create different types of OzoneClients.
- */
-public final class OzoneClientFactory {
-
-  /**
-   * Private constructor, class is not meant to be initialized.
-   */
-  private OzoneClientFactory(){}
-
-  private static Configuration configuration;
-
-  /**
-   * Returns an OzoneClient which will use RPC protocol to perform
-   * client operations.
-   *
-   * @return OzoneClient
-   * @throws IOException
-   */
-  public static OzoneClient getRpcClient() throws IOException {
-    return new OzoneClientImpl(getConfiguration());
-  }
-
-  /**
-   * Sets the configuration, which will be used while creating OzoneClient.
-   *
-   * @param conf
-   */
-  public static void setConfiguration(Configuration conf) {
-    configuration = conf;
-  }
-
-  /**
-   * Returns the configuration if it's already set, else creates a new
-   * {@link OzoneConfiguration} and returns it.
-   *
-   * @return Configuration
-   */
-  private static synchronized Configuration getConfiguration() {
-    if(configuration == null) {
-      setConfiguration(new OzoneConfiguration());
-    }
-    return configuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
deleted file mode 100644
index feb4586..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
+++ /dev/null
@@ -1,570 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocolPB
-    .KeySpaceManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ksm.protocolPB
-    .KeySpaceManagerProtocolPB;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
-import org.apache.hadoop.ozone.io.OzoneInputStream;
-import org.apache.hadoop.ozone.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
-import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.protocolPB
-    .StorageContainerLocationProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-/**
- * Ozone Client Implementation, it connects to KSM, SCM and DataNode
- * to execute client calls. This uses RPC protocol for communication
- * with the servers.
- */
-public class OzoneClientImpl implements OzoneClient, Closeable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OzoneClient.class);
-
-  private final StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private final KeySpaceManagerProtocolClientSideTranslatorPB
-      keySpaceManagerClient;
-  private final XceiverClientManager xceiverClientManager;
-  private final int chunkSize;
-
-
-  private final UserGroupInformation ugi;
-  private final OzoneAcl.OzoneACLRights userRights;
-  private final OzoneAcl.OzoneACLRights groupRights;
-
-  /**
-   * Creates OzoneClientImpl instance with new OzoneConfiguration.
-   *
-   * @throws IOException
-   */
-  public OzoneClientImpl() throws IOException {
-    this(new OzoneConfiguration());
-  }
-
-   /**
-    * Creates OzoneClientImpl instance with the given configuration.
-    *
-    * @param conf
-    *
-    * @throws IOException
-    */
-  public OzoneClientImpl(Configuration conf) throws IOException {
-    Preconditions.checkNotNull(conf);
-    this.ugi = UserGroupInformation.getCurrentUser();
-    this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
-        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
-    this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
-        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
-
-    long scmVersion =
-        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-    InetSocketAddress scmAddress =
-        OzoneClientUtils.getScmAddressForClients(conf);
-    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-    this.storageContainerLocationClient =
-        new StorageContainerLocationProtocolClientSideTranslatorPB(
-            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
-                scmAddress, UserGroupInformation.getCurrentUser(), conf,
-                NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-
-    long ksmVersion =
-        RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
-    InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
-    RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
-        ProtobufRpcEngine.class);
-    this.keySpaceManagerClient =
-        new KeySpaceManagerProtocolClientSideTranslatorPB(
-            RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
-                ksmAddress, UserGroupInformation.getCurrentUser(), conf,
-                NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-
-    this.xceiverClientManager = new XceiverClientManager(conf);
-
-    int configuredChunkSize = conf.getInt(
-        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
-        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
-    if(configuredChunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
-      LOG.warn("The chunk size ({}) is not allowed to be more than"
-              + " the maximum size ({}),"
-              + " resetting to the maximum size.",
-          configuredChunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
-      chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
-    } else {
-      chunkSize = configuredChunkSize;
-    }
-  }
-
-  @Override
-  public void createVolume(String volumeName)
-      throws IOException {
-    createVolume(volumeName, ugi.getUserName());
-  }
-
-  @Override
-  public void createVolume(String volumeName, String owner)
-      throws IOException {
-
-    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES,
-        (OzoneAcl[])null);
-  }
-
-  @Override
-  public void createVolume(String volumeName, String owner,
-                           OzoneAcl... acls)
-      throws IOException {
-    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls);
-  }
-
-  @Override
-  public void createVolume(String volumeName, String owner,
-                           long quota)
-      throws IOException {
-    createVolume(volumeName, owner, quota, (OzoneAcl[])null);
-  }
-
-  @Override
-  public void createVolume(String volumeName, String owner,
-                           long quota, OzoneAcl... acls)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(owner);
-    Preconditions.checkNotNull(quota);
-    Preconditions.checkState(quota >= 0);
-    OzoneAcl userAcl =
-        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
-            owner, userRights);
-    KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
-    builder.setAdminName(ugi.getUserName())
-        .setOwnerName(owner)
-        .setVolume(volumeName)
-        .setQuotaInBytes(quota)
-        .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
-
-    List<OzoneAcl> listOfAcls = new ArrayList<>();
-
-    //Group ACLs of the User
-    List<String> userGroups = Arrays.asList(UserGroupInformation
-        .createRemoteUser(owner).getGroupNames());
-    userGroups.stream().forEach((group) -> listOfAcls.add(
-        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
-
-    //ACLs passed as argument
-    if(acls != null) {
-      listOfAcls.addAll(Arrays.asList(acls));
-    }
-
-    //Remove duplicates and set
-    for (OzoneAcl ozoneAcl :
-        listOfAcls.stream().distinct().collect(Collectors.toList())) {
-      builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(ozoneAcl));
-    }
-
-    LOG.info("Creating Volume: {}, with {} as owner and quota set to {} 
bytes.",
-        volumeName, owner, quota);
-    keySpaceManagerClient.createVolume(builder.build());
-  }
-
-  @Override
-  public void setVolumeOwner(String volumeName, String owner)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(owner);
-    keySpaceManagerClient.setOwner(volumeName, owner);
-  }
-
-  @Override
-  public void setVolumeQuota(String volumeName, long quota)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(quota);
-    Preconditions.checkState(quota >= 0);
-    keySpaceManagerClient.setQuota(volumeName, quota);
-  }
-
-  @Override
-  public OzoneVolume getVolumeDetails(String volumeName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    KsmVolumeArgs volumeArgs =
-        keySpaceManagerClient.getVolumeInfo(volumeName);
-    return new OzoneVolume(volumeArgs);
-  }
-
-  @Override
-  public boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    return keySpaceManagerClient.checkVolumeAccess(volumeName,
-        KSMPBHelper.convertOzoneAcl(acl));
-  }
-
-  @Override
-  public void deleteVolume(String volumeName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    keySpaceManagerClient.deleteVolume(volumeName);
-  }
-
-  @Override
-  public Iterator<OzoneVolume> listVolumes(String volumePrefix)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented.");
-  }
-
-  @Override
-  public Iterator<OzoneVolume> listVolumes(String volumePrefix,
-                                             String user)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented.");
-  }
-
-  @Override
-  public void createBucket(String volumeName, String bucketName)
-      throws IOException {
-    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
-        StorageType.DEFAULT, (OzoneAcl[])null);
-  }
-
-  @Override
-  public void createBucket(String volumeName, String bucketName,
-                           Versioning versioning)
-      throws IOException {
-    createBucket(volumeName, bucketName, versioning,
-        StorageType.DEFAULT, (OzoneAcl[])null);
-  }
-
-  @Override
-  public void createBucket(String volumeName, String bucketName,
-                           StorageType storageType)
-      throws IOException {
-    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
-        storageType, (OzoneAcl[])null);
-  }
-
-  @Override
-  public void createBucket(String volumeName, String bucketName,
-                           OzoneAcl... acls)
-      throws IOException {
-    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
-        StorageType.DEFAULT, acls);
-  }
-
-  @Override
-  public void createBucket(String volumeName, String bucketName,
-                           Versioning versioning, StorageType storageType,
-                           OzoneAcl... acls)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(versioning);
-    Preconditions.checkNotNull(storageType);
-
-    KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder();
-    builder.setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setStorageType(storageType)
-        .setIsVersionEnabled(getBucketVersioningProtobuf(
-        versioning));
-
-    String owner = ugi.getUserName();
-    final List<OzoneAcl> listOfAcls = new ArrayList<>();
-
-    //User ACL
-    OzoneAcl userAcl =
-        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
-            owner, userRights);
-    listOfAcls.add(userAcl);
-
-    //Group ACLs of the User
-    List<String> userGroups = Arrays.asList(UserGroupInformation
-        .createRemoteUser(owner).getGroupNames());
-    userGroups.stream().forEach((group) -> listOfAcls.add(
-        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
-
-    //ACLs passed as argument
-    if(acls != null) {
-      Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl));
-    }
-
-    //Remove duplicates and set
-    builder.setAcls(listOfAcls.stream().distinct()
-        .collect(Collectors.toList()));
-    LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
-        "Storage Type set to {}", volumeName, bucketName, versioning,
-        storageType);
-    keySpaceManagerClient.createBucket(builder.build());
-  }
-
-  /**
-   * Converts OzoneConts.Versioning enum to boolean.
-   *
-   * @param version
-   * @return corresponding boolean value
-   */
-  private boolean getBucketVersioningProtobuf(
-      Versioning version) {
-    if(version != null) {
-      switch(version) {
-      case ENABLED:
-        return true;
-      case NOT_DEFINED:
-      case DISABLED:
-      default:
-        return false;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void addBucketAcls(String volumeName, String bucketName,
-                            List<OzoneAcl> addAcls)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(addAcls);
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
-    builder.setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setAddAcls(addAcls);
-    keySpaceManagerClient.setBucketProperty(builder.build());
-  }
-
-  @Override
-  public void removeBucketAcls(String volumeName, String bucketName,
-                               List<OzoneAcl> removeAcls)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(removeAcls);
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
-    builder.setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setRemoveAcls(removeAcls);
-    keySpaceManagerClient.setBucketProperty(builder.build());
-  }
-
-  @Override
-  public void setBucketVersioning(String volumeName, String bucketName,
-                                  Versioning versioning)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(versioning);
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
-    builder.setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setIsVersionEnabled(getBucketVersioningFlag(
-            versioning));
-    keySpaceManagerClient.setBucketProperty(builder.build());
-  }
-
-  @Override
-  public void setBucketStorageType(String volumeName, String bucketName,
-                                   StorageType storageType)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(storageType);
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
-    builder.setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setStorageType(storageType);
-    keySpaceManagerClient.setBucketProperty(builder.build());
-  }
-
-  @Override
-  public void deleteBucket(String volumeName, String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    keySpaceManagerClient.deleteBucket(volumeName, bucketName);
-  }
-
-  @Override
-  public void checkBucketAccess(String volumeName, String bucketName)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented.");
-  }
-
-  @Override
-  public OzoneBucket getBucketDetails(String volumeName,
-                                        String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    KsmBucketInfo bucketInfo =
-        keySpaceManagerClient.getBucketInfo(volumeName, bucketName);
-    return new OzoneBucket(bucketInfo);
-  }
-
-  @Override
-  public Iterator<OzoneBucket> listBuckets(String volumeName,
-                                            String bucketPrefix)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented.");
-  }
-
-  @Override
-  public OzoneOutputStream createKey(String volumeName, String bucketName,
-                                     String keyName, long size)
-      throws IOException {
-    String requestId = UUID.randomUUID().toString();
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .setDataSize(size)
-        .build();
-
-    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
-    ChunkGroupOutputStream  groupOutputStream =
-        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
-        storageContainerLocationClient, chunkSize, requestId);
-    return new OzoneOutputStream(groupOutputStream);
-  }
-
-  @Override
-  public OzoneInputStream getKey(String volumeName, String bucketName,
-                                 String keyName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(keyName);
-    String requestId = UUID.randomUUID().toString();
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .build();
-    KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
-    LengthInputStream lengthInputStream =
-        ChunkGroupInputStream.getFromKsmKeyInfo(
-        keyInfo, xceiverClientManager, storageContainerLocationClient,
-        requestId);
-    return new OzoneInputStream(
-        (ChunkGroupInputStream)lengthInputStream.getWrappedStream());
-  }
-
-  @Override
-  public void deleteKey(String volumeName, String bucketName,
-                        String keyName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(keyName);
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .build();
-    keySpaceManagerClient.deleteKey(keyArgs);
-  }
-
-  @Override
-  public List<OzoneKey> listKeys(String volumeName, String bucketName,
-                                   String keyPrefix)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented.");
-  }
-
-  @Override
-  public OzoneKey getKeyDetails(String volumeName, String bucketName,
-                                  String keyName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(keyName);
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .build();
-    KsmKeyInfo keyInfo =
-        keySpaceManagerClient.lookupKey(keyArgs);
-    return new OzoneKey(keyInfo);
-  }
-
-  /**
-   * Converts Versioning to boolean.
-   *
-   * @param version
-   * @return corresponding boolean value
-   */
-  private boolean getBucketVersioningFlag(
-      Versioning version) {
-    if(version != null) {
-      switch(version) {
-      case ENABLED:
-        return true;
-      case DISABLED:
-      case NOT_DEFINED:
-      default:
-        return false;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
-    IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
-    IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
deleted file mode 100644
index d92f49f..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
+++ /dev/null
@@ -1,705 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import com.google.common.base.Optional;
-
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_JSCSI_PORT_DEFAULT;
-
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_BIND_HOST_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_DEADNODE_INTERVAL_MS;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
-
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
-
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_STALENODE_INTERVAL_MS;
-
-/**
- * Utility methods for Ozone and Container Clients.
- *
- * The methods to retrieve SCM service endpoints assume there is a single
- * SCM service instance. This will change when we switch to replicated service
- * instances for redundancy.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public final class OzoneClientUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      OzoneClientUtils.class);
-  private static final int NO_PORT = -1;
-
-  /**
-   * The service ID of the solitary Ozone SCM service.
-   */
-  public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService";
-  public static final String OZONE_SCM_SERVICE_INSTANCE_ID =
-      "OzoneScmServiceInstance";
-
-  private OzoneClientUtils() {
-    // Never constructed
-  }
-
-  /**
-   * Retrieve the socket addresses of all storage container managers.
-   *
-   * @param conf
-   * @return A collection of SCM addresses
-   * @throws IllegalArgumentException If the configuration is invalid
-   */
-  public static Collection<InetSocketAddress> getSCMAddresses(
-      Configuration conf) throws IllegalArgumentException {
-    Collection<InetSocketAddress> addresses =
-        new HashSet<InetSocketAddress>();
-    Collection<String> names =
-        conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
-    if (names == null || names.isEmpty()) {
-      throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
-          + " need to be a set of valid DNS names or IP addresses."
-          + " Null or empty address list found.");
-    }
-
-    final com.google.common.base.Optional<Integer>
-        defaultPort =  com.google.common.base.Optional.of(ScmConfigKeys
-        .OZONE_SCM_DEFAULT_PORT);
-    for (String address : names) {
-      com.google.common.base.Optional<String> hostname =
-          OzoneClientUtils.getHostName(address);
-      if (!hostname.isPresent()) {
-        throw new IllegalArgumentException("Invalid hostname for SCM: "
-            + hostname);
-      }
-      com.google.common.base.Optional<Integer> port =
-          OzoneClientUtils.getHostPort(address);
-      InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
-          port.or(defaultPort.get()));
-      addresses.add(addr);
-    }
-    return addresses;
-  }
-
-  /**
-   * Retrieve the socket address that should be used by clients to connect
-   * to the SCM.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM client endpoint.
-   */
-  public static InetSocketAddress getScmAddressForClients(Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
-
-    if (!host.isPresent()) {
-      throw new IllegalArgumentException(
-          ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
-          " must be defined. See" +
-          " https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
-          " on configuring Ozone.");
-    }
-
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(host.get() + ":" +
-        port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that should be used by clients to connect
-   * to the SCM for block service. If
-   * {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined
-   * then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM block client endpoint.
-   * @throws IllegalArgumentException if configuration is not defined.
-   */
-  public static InetSocketAddress getScmAddressForBlockClients(
-      Configuration conf) {
-    Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
-
-    if (!host.isPresent()) {
-      host = getHostNameFromConfigKeys(conf,
-              ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
-      if (!host.isPresent()) {
-        throw new IllegalArgumentException(
-                ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY +
-                        " must be defined. See" +
-                        " https://wiki.apache.org/hadoop/Ozone#Configuration 
for details" +
-                        " on configuring Ozone.");
-      }
-    }
-
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(host.get() + ":" +
-        port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that should be used by DataNodes to connect
-   * to the SCM.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM service endpoint.
-   */
-  public static InetSocketAddress getScmAddressForDataNodes(
-      Configuration conf) {
-    // We try the following settings in decreasing priority to retrieve the
-    // target host.
-    // - OZONE_SCM_DATANODE_ADDRESS_KEY
-    // - OZONE_SCM_CLIENT_ADDRESS_KEY
-    //
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
-        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
-
-    if (!host.isPresent()) {
-      throw new IllegalArgumentException(
-          ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
-          " must be defined. See" +
-          " https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
-          " on configuring Ozone.");
-    }
-
-    // If no port number is specified then we'll just try the defaultBindPort.
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
-
-    InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" +
-        port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    return addr;
-  }
-
-  /**
-   * Retrieve the socket address that should be used by clients to connect
-   * to the SCM.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM client endpoint.
-   */
-  public static InetSocketAddress getScmClientBindAddress(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY);
-
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
-            port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that should be used by clients to connect
-   * to the SCM Block service.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM block client endpoint.
-   */
-  public static InetSocketAddress getScmBlockClientBindAddress(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
-
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
-            ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that should be used by DataNodes to connect
-   * to the SCM.
-   *
-   * @param conf
-   * @return Target InetSocketAddress for the SCM service endpoint.
-   */
-  public static InetSocketAddress getScmDataNodeBindAddress(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
-
-    // If no port number is specified then we'll just try the defaultBindPort.
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
-            port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-  }
-
-
-  /**
-   * Retrieve the socket address that is used by KSM.
-   * @param conf
-   * @return Target InetSocketAddress for the SCM service endpoint.
-   */
-  public static InetSocketAddress getKsmAddress(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        OZONE_KSM_ADDRESS_KEY);
-
-    // If no port number is specified then we'll just try the defaultBindPort.
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        OZONE_KSM_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(OZONE_KSM_BIND_HOST_DEFAULT) + ":" +
-            port.or(OZONE_KSM_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that is used by CBlock Service.
-   * @param conf
-   * @return Target InetSocketAddress for the CBlock Service endpoint.
-   */
-  public static InetSocketAddress getCblockServiceRpcAddr(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
-
-    // If no port number is specified then we'll just try the defaultBindPort.
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
-            port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the socket address that is used by CBlock Server.
-   * @param conf
-   * @return Target InetSocketAddress for the CBlock Server endpoint.
-   */
-  public static InetSocketAddress getCblockServerRpcAddr(
-      Configuration conf) {
-    final Optional<String> host = getHostNameFromConfigKeys(conf,
-        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
-
-    // If no port number is specified then we'll just try the defaultBindPort.
-    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
-        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
-
-    return NetUtils.createSocketAddr(
-        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
-            port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
-  }
-
-  /**
-   * Retrieve the hostname, trying the supplied config keys in order.
-   * Each config value may be absent, or if present in the format
-   * host:port (the :port part is optional).
-   *
-   * @param conf  - Conf
-   * @param keys a list of configuration key names.
-   *
-   * @return first hostname component found from the given keys, or absent.
-   * @throws IllegalArgumentException if any values are not in the 'host'
-   *             or host:port format.
-   */
-  public static Optional<String> getHostNameFromConfigKeys(Configuration conf,
-      String... keys) {
-    for (final String key : keys) {
-      final String value = conf.getTrimmed(key);
-      final Optional<String> hostName = getHostName(value);
-      if (hostName.isPresent()) {
-        return hostName;
-      }
-    }
-    return Optional.absent();
-  }
-
-  /**
-   * Gets the hostname or Indicates that it is absent.
-   * @param value host or host:port
-   * @return hostname
-   */
-  public static Optional<String> getHostName(String value) {
-    if ((value == null) || value.isEmpty()) {
-      return Optional.absent();
-    }
-    return Optional.of(HostAndPort.fromString(value).getHostText());
-  }
-
-  /**
-   * Gets the port if there is one, throws otherwise.
-   * @param value  String in host:port format.
-   * @return Port
-   */
-  public static Optional<Integer> getHostPort(String value) {
-    if((value == null) || value.isEmpty()) {
-      return Optional.absent();
-    }
-    int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
-    if (port == NO_PORT) {
-      return Optional.absent();
-    } else {
-      return Optional.of(port);
-    }
-  }
-
-  /**
-   * Retrieve the port number, trying the supplied config keys in order.
-   * Each config value may be absent, or if present in the format
-   * host:port (the :port part is optional).
-   *
-   * @param conf Conf
-   * @param keys a list of configuration key names.
-   *
-   * @return first port number component found from the given keys, or absent.
-   * @throws IllegalArgumentException if any values are not in the 'host'
-   *             or host:port format.
-   */
-  public static Optional<Integer> getPortNumberFromConfigKeys(
-      Configuration conf, String... keys) {
-    for (final String key : keys) {
-      final String value = conf.getTrimmed(key);
-      final Optional<Integer> hostPort = getHostPort(value);
-      if (hostPort.isPresent()) {
-        return hostPort;
-      }
-    }
-    return Optional.absent();
-  }
-
-  /**
-   * Return the list of service addresses for the Ozone SCM. This method is 
used
-   * by the DataNodes to determine the service instances to connect to.
-   *
-   * @param conf
-   * @return list of SCM service addresses.
-   */
-  public static Map<String, ? extends Map<String, InetSocketAddress>>
-      getScmServiceRpcAddresses(Configuration conf) {
-    final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
-    serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
-        getScmAddressForDataNodes(conf));
-
-    final Map<String, Map<String, InetSocketAddress>> services =
-        new HashMap<>();
-    services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
-    return services;
-  }
-
-  /**
-   * Checks that a given value is with a range.
-   *
-   * For example, sanitizeUserArgs(17, 3, 5, 10)
-   * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10.
-   *
-   * @param valueTocheck  - value to check
-   * @param baseValue     - the base value that is being used.
-   * @param minFactor     - range min - a 2 here makes us ensure that value
-   *                        valueTocheck is at least twice the baseValue.
-   * @param maxFactor     - range max
-   * @return long
-   */
-  private static long sanitizeUserArgs(long valueTocheck, long baseValue,
-      long minFactor, long maxFactor)
-      throws IllegalArgumentException {
-    if ((valueTocheck >= (baseValue * minFactor)) &&
-        (valueTocheck <= (baseValue * maxFactor))) {
-      return valueTocheck;
-    }
-    String errMsg = String.format("%d is not within min = %d or max = " +
-        "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor);
-    throw new IllegalArgumentException(errMsg);
-  }
-
-  /**
-   * Returns the interval in which the heartbeat processor thread runs.
-   *
-   * @param conf - Configuration
-   * @return long in Milliseconds.
-   */
-  public static long getScmheartbeatCheckerInterval(Configuration conf) {
-    return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
-        ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT);
-  }
-
-  /**
-   * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
-   * SCM.
-   *
-   * @param conf - Ozone Config
-   * @return - HB interval in seconds.
-   */
-  public static long getScmHeartbeatInterval(Configuration conf) {
-    return conf.getTimeDuration(
-        OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
-        ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT,
-        TimeUnit.SECONDS);
-  }
-
-  /**
-   * Get the Stale Node interval, which is used by SCM to flag a datanode as
-   * stale, if the heartbeat from that node has been missing for this duration.
-   *
-   * @param conf - Configuration.
-   * @return - Long, Milliseconds to wait before flagging a node as stale.
-   */
-  public static long getStaleNodeInterval(Configuration conf) {
-
-    long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS,
-        OZONE_SCM_STALENODE_INTERVAL_DEFAULT);
-
-    long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
-
-    long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
-
-
-    // Make sure that StaleNodeInterval is configured way above the frequency
-    // at which we run the heartbeat thread.
-    //
-    // Here we check that staleNodeInterval is at least five times more than 
the
-    // frequency at which the accounting thread is going to run.
-    try {
-      sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 
1000);
-    } catch (IllegalArgumentException ex) {
-      LOG.error("Stale Node Interval MS is cannot be honored due to " +
-              "mis-configured {}. ex:  {}",
-          OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex);
-      throw ex;
-    }
-
-    // Make sure that stale node value is greater than configured value that
-    // datanodes are going to send HBs.
-    try {
-      sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000);
-    } catch (IllegalArgumentException ex) {
-      LOG.error("Stale Node Interval MS is cannot be honored due to " +
-              "mis-configured {}. ex:  {}",
-          OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex);
-      throw ex;
-    }
-    return staleNodeIntevalMs;
-  }
-
-  /**
-   * Gets the interval for dead node flagging. This has to be a value that is
-   * greater than stale node value,  and by transitive relation we also know
-   * that this value is greater than heartbeat interval and heartbeatProcess
-   * Interval.
-   *
-   * @param conf - Configuration.
-   * @return - the interval for dead node flagging.
-   */
-  public static long getDeadNodeInterval(Configuration conf) {
-    long staleNodeIntervalMs = getStaleNodeInterval(conf);
-    long deadNodeIntervalMs = conf.getLong(
-        OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT);
-
-    try {
-      // Make sure that dead nodes Ms is at least twice the time for staleNodes
-      // with a max of 1000 times the staleNodes.
-      sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
-    } catch (IllegalArgumentException ex) {
-      LOG.error("Dead Node Interval MS is cannot be honored due to " +
-              "mis-configured {}. ex:  {}",
-          OZONE_SCM_STALENODE_INTERVAL_MS, ex);
-      throw ex;
-    }
-    return deadNodeIntervalMs;
-  }
-
-  /**
-   * Returns the maximum number of heartbeat to process per loop of the process
-   * thread.
-   * @param conf Configuration
-   * @return - int -- Number of HBs to process
-   */
-  public static int getMaxHBToProcessPerLoop(Configuration conf) {
-    return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
-        ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
-  }
-
-  /**
-   * Timeout value for the RPC from Datanode to SCM, primarily used for
-   * Heartbeats and container reports.
-   *
-   * @param conf - Ozone Config
-   * @return - Rpc timeout in Milliseconds.
-   */
-  public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
-    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
-        OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Log Warn interval.
-   *
-   * @param conf - Ozone Config
-   * @return - Log warn interval.
-   */
-  public static int getLogWarnInterval(Configuration conf) {
-    return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
-        OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
-  }
-
-  /**
-   * returns the Container port.
-   * @param conf - Conf
-   * @return port number.
-   */
-  public static int getContainerPort(Configuration conf) {
-    return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys
-        .DFS_CONTAINER_IPC_PORT_DEFAULT);
-  }
-
-  /**
-   * After starting an RPC server, updates configuration with the actual
-   * listening address of that server. The listening address may be different
-   * from the configured address if, for example, the configured address uses
-   * port 0 to request use of an ephemeral port.
-   *
-   * @param conf configuration to update
-   * @param rpcAddressKey configuration key for RPC server address
-   * @param addr configured address
-   * @param rpcServer started RPC server.
-   */
-  public static InetSocketAddress updateRPCListenAddress(
-      OzoneConfiguration conf, String rpcAddressKey,
-      InetSocketAddress addr, RPC.Server rpcServer) {
-    return updateListenAddress(conf, rpcAddressKey, addr,
-        rpcServer.getListenerAddress());
-  }
-
-  /**
-   * After starting an server, updates configuration with the actual
-   * listening address of that server. The listening address may be different
-   * from the configured address if, for example, the configured address uses
-   * port 0 to request use of an ephemeral port.
-   *
-   * @param conf       configuration to update
-   * @param addressKey configuration key for RPC server address
-   * @param addr       configured address
-   * @param listenAddr the real listening address.
-   */
-  public static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
-      String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) 
{
-    InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(),
-        listenAddr.getPort());
-    conf.set(addressKey,
-        addr.getHostString() + ":" + listenAddr.getPort());
-    return updatedAddr;
-  }
-
-  /**
-   * Releases a http connection if the request is not null.
-   * @param request
-   */
-  public static void releaseConnection(HttpRequestBase request) {
-    if (request != null) {
-      request.releaseConnection();
-    }
-  }
-
-  /**
-   * @return a default instance of {@link CloseableHttpClient}.
-   */
-  public static CloseableHttpClient newHttpClient() {
-    return OzoneClientUtils.newHttpClient(null);
-  }
-
-  /**
-   * Returns a {@link CloseableHttpClient} configured by given configuration.
-   * If conf is null, returns a default instance.
-   *
-   * @param conf configuration
-   * @return a {@link CloseableHttpClient} instance.
-   */
-  public static CloseableHttpClient newHttpClient(Configuration conf) {
-    int socketTimeout = OzoneConfigKeys
-        .OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT;
-    int connectionTimeout = OzoneConfigKeys
-        .OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT;
-    if (conf != null) {
-      socketTimeout = conf.getInt(
-          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS,
-          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT);
-      connectionTimeout = conf.getInt(
-          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS,
-          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT);
-    }
-
-    CloseableHttpClient client = HttpClients.custom()
-        .setDefaultRequestConfig(
-            RequestConfig.custom()
-                .setSocketTimeout(socketTimeout)
-                .setConnectTimeout(connectionTimeout)
-                .build())
-        .build();
-    return client;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to