HDFS-13424. Ozone: Refactor MiniOzoneClassicCluster. Contributed by Nanda Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06d228a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06d228a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06d228a3

Branch: refs/heads/HDFS-7240
Commit: 06d228a354b130c8a04c86a6647b52b24c886281
Parents: fd84dea
Author: Mukul Kumar Singh <msi...@apache.org>
Authored: Mon Apr 16 20:18:27 2018 +0530
Committer: Mukul Kumar Singh <msi...@apache.org>
Committed: Mon Apr 16 20:18:27 2018 +0530

----------------------------------------------------------------------
 .../hadoop/ozone/HddsDatanodeService.java       |  72 ++-
 hadoop-ozone/integration-test/pom.xml           |   6 +-
 .../container/TestContainerStateManager.java    |  12 +-
 .../hadoop/ozone/MiniOzoneClassicCluster.java   | 616 -------------------
 .../apache/hadoop/ozone/MiniOzoneCluster.java   | 291 ++++++++-
 .../hadoop/ozone/MiniOzoneClusterImpl.java      | 425 +++++++++++++
 .../hadoop/ozone/MiniOzoneTestHelper.java       |  81 ---
 .../apache/hadoop/ozone/RatisTestHelper.java    |  24 +-
 .../hadoop/ozone/TestContainerOperations.java   |   9 +-
 .../hadoop/ozone/TestMiniOzoneCluster.java      |  27 +-
 .../ozone/TestStorageContainerManager.java      |  34 +-
 .../TestStorageContainerManagerHelper.java      |  12 +-
 .../ozone/client/rest/TestOzoneRestClient.java  |   9 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    |  10 +-
 .../TestCloseContainerHandler.java              |  22 +-
 .../container/ozoneimpl/TestOzoneContainer.java |  36 +-
 .../ozoneimpl/TestOzoneContainerRatis.java      |  19 +-
 .../container/ozoneimpl/TestRatisManager.java   |  18 +-
 .../hadoop/ozone/freon/TestDataValidate.java    |   7 +-
 .../apache/hadoop/ozone/freon/TestFreon.java    |  10 +-
 .../ozone/ksm/TestContainerReportWithKeys.java  |  20 +-
 .../apache/hadoop/ozone/ksm/TestKSMMetrcis.java |   5 +-
 .../apache/hadoop/ozone/ksm/TestKSMSQLCli.java  |  27 +-
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |   5 +-
 .../ksm/TestKeySpaceManagerRestInterface.java   |  23 +-
 .../ozone/ksm/TestKsmBlockVersioning.java       |   5 +-
 .../ksm/TestMultipleContainerReadWrite.java     |   5 +-
 .../hadoop/ozone/ozShell/TestOzoneShell.java    |  21 +-
 .../hadoop/ozone/scm/TestAllocateContainer.java |  13 +-
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |  43 +-
 .../ozone/scm/TestContainerSmallFile.java       |  14 +-
 .../org/apache/hadoop/ozone/scm/TestSCMCli.java |  21 +-
 .../apache/hadoop/ozone/scm/TestSCMMXBean.java  |  14 +-
 .../apache/hadoop/ozone/scm/TestSCMMetrics.java |  23 +-
 .../ozone/scm/TestXceiverClientManager.java     |  18 +-
 .../ozone/scm/TestXceiverClientMetrics.java     |  12 +-
 .../hadoop/ozone/scm/node/TestQueryNode.java    |  19 +-
 .../ozone/web/TestDistributedOzoneVolumes.java  |  14 +-
 .../hadoop/ozone/web/TestLocalOzoneVolumes.java |  18 +-
 .../ozone/web/TestOzoneRestWithMiniCluster.java |  17 +-
 .../hadoop/ozone/web/TestOzoneWebAccess.java    |  14 +-
 .../hadoop/ozone/web/client/TestBuckets.java    |  14 +-
 .../hadoop/ozone/web/client/TestKeys.java       |  32 +-
 .../hadoop/ozone/web/client/TestKeysRatis.java  |   5 +-
 .../ozone/web/client/TestOzoneClient.java       |  16 +-
 .../hadoop/ozone/web/client/TestVolume.java     |  14 +-
 .../ozone/web/client/TestVolumeRatis.java       |  14 +-
 .../src/test/resources/log4j.properties         |  18 +
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |   6 +-
 hadoop-tools/hadoop-ozone/pom.xml               |  25 +
 .../hadoop/fs/ozone/TestOzoneFSInputStream.java |  20 +-
 .../fs/ozone/TestOzoneFileInterfaces.java       |  15 +-
 .../hadoop/fs/ozone/contract/OzoneContract.java |  15 +-
 53 files changed, 1140 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index fa0f50c..ce7ca6f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -51,18 +51,42 @@ public class HddsDatanodeService implements ServicePlugin {
       HddsDatanodeService.class);
 
 
-  private Configuration conf;
+  private OzoneConfiguration conf;
   private DatanodeDetails datanodeDetails;
   private DatanodeStateMachine datanodeStateMachine;
   private List<ServicePlugin> plugins;
 
+  /**
+   * Default constructor.
+   */
+  public HddsDatanodeService() {
+    this(null);
+  }
+
+  /**
+   * Constructs {@link HddsDatanodeService} using the provided {@code conf}
+   * value.
+   *
+   * @param conf OzoneConfiguration
+   */
+  public HddsDatanodeService(Configuration conf) {
+    if (conf == null) {
+      this.conf = new OzoneConfiguration();
+    } else {
+      this.conf = new OzoneConfiguration(conf);
+    }
+  }
+
+  /**
+   * Starts HddsDatanode services.
+   *
+   * @param service The service instance invoking this method
+   */
   @Override
   public void start(Object service) {
     OzoneConfiguration.activate();
     if (service instanceof Configurable) {
       conf = new OzoneConfiguration(((Configurable) service).getConf());
-    } else {
-      conf = new OzoneConfiguration();
     }
     if (HddsUtils.isHddsEnabled(conf)) {
       try {
@@ -109,6 +133,11 @@ public class HddsDatanodeService implements ServicePlugin {
       return DatanodeDetails.newBuilder().setUuid(datanodeUuid).build();
     }
   }
+
+  /**
+   * Starts all the service plugins which are configured using
+   * OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY.
+   */
   private void startPlugins() {
     try {
       plugins = conf.getInstances(HDDS_DATANODE_PLUGINS_KEY,
@@ -130,7 +159,12 @@ public class HddsDatanodeService implements ServicePlugin {
     }
   }
 
-  public Configuration getConf() {
+  /**
+   * Returns the OzoneConfiguration used by this HddsDatanodeService.
+   *
+   * @return OzoneConfiguration
+   */
+  public OzoneConfiguration getConf() {
     return conf;
   }
   /**
@@ -149,8 +183,13 @@ public class HddsDatanodeService implements ServicePlugin {
     return datanodeStateMachine;
   }
 
-  public void join() throws InterruptedException {
-    datanodeStateMachine.join();
+  public void join() {
+    try {
+      datanodeStateMachine.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted during StorageContainerManager join.");
+    }
   }
 
   @Override
@@ -172,20 +211,31 @@ public class HddsDatanodeService implements ServicePlugin 
{
 
   @Override
   public void close() throws IOException {
+    if (plugins != null) {
+      for (ServicePlugin plugin : plugins) {
+        try {
+          plugin.close();
+        } catch (Throwable t) {
+          LOG.warn("ServicePlugin {} could not be closed", plugin, t);
+        }
+      }
+    }
   }
 
-  public static HddsDatanodeService createHddsDatanodeService(String args[]) {
-    StringUtils.startupShutdownMessage(HddsDatanodeService.class, args, LOG);
-    return new HddsDatanodeService();
+  public static HddsDatanodeService createHddsDatanodeService(
+      Configuration conf) {
+    return new HddsDatanodeService(conf);
   }
 
   public static void main(String args[]) {
     try {
-      HddsDatanodeService hddsDatanodeService = 
createHddsDatanodeService(args);
+      StringUtils.startupShutdownMessage(HddsDatanodeService.class, args, LOG);
+      HddsDatanodeService hddsDatanodeService =
+          createHddsDatanodeService(new OzoneConfiguration());
       hddsDatanodeService.start(null);
       hddsDatanodeService.join();
     } catch (Throwable e) {
-      LOG.error("Exception in while starting HddsDatanodeService.", e);
+      LOG.error("Exception in HddsDatanodeService.", e);
       terminate(1, e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/pom.xml 
b/hadoop-ozone/integration-test/pom.xml
index 9d975539..89bf928 100644
--- a/hadoop-ozone/integration-test/pom.xml
+++ b/hadoop-ozone/integration-test/pom.xml
@@ -61,7 +61,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
-
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.openjdk.jmh</groupId>
       <artifactId>jmh-core</artifactId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 3ed80b3..25754ac 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -18,15 +18,10 @@ package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.ContainerMapping;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.junit.After;
@@ -54,10 +49,10 @@ public class TestContainerStateManager {
 
 
   @Before
-  public void setup() throws IOException {
+  public void setup() throws Exception {
     conf = new OzoneConfiguration();
-    cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
     xceiverClientManager = new XceiverClientManager(conf);
     scm = cluster.getStorageContainerManager();
     scmContainerMapping = scm.getScmContainerManager();
@@ -68,7 +63,6 @@ public class TestContainerStateManager {
   public void cleanUp() {
     if (cluster != null) {
       cluster.shutdown();
-      cluster.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java
deleted file mode 100644
index 183fe73..0000000
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone;
-
-import java.io.File;
-import java.util.Optional;
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.container.common
-    .statemachine.DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
-import org.apache.hadoop.ozone.ksm.KeySpaceManager;
-import org.apache.hadoop.hdds.scm.SCMStorage;
-import org.apache.hadoop.ozone.ksm.KSMStorage;
-import org.apache.hadoop.ozone.web.client.OzoneRestClient;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.ozone.MiniOzoneTestHelper.*;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .DFS_CONTAINER_IPC_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .DFS_CONTAINER_IPC_RANDOM_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .DFS_CONTAINER_RATIS_IPC_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
-import static org.junit.Assert.assertFalse;
-
-/**
- * MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
- * running tests.  The cluster consists of a StorageContainerManager, Namenode
- * and multiple DataNodes.  This class subclasses {@link MiniDFSCluster} for
- * convenient reuse of logic for starting DataNodes.
- */
-@InterfaceAudience.Private
-public final class MiniOzoneClassicCluster extends MiniDFSCluster
-    implements MiniOzoneCluster {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(MiniOzoneClassicCluster.class);
-  private static final String USER_AUTH = "hdfs";
-
-  private final OzoneConfiguration conf;
-  private final StorageContainerManager scm;
-  private KeySpaceManager ksm;
-  private final Path tempPath;
-
-  /**
-   * Creates a new MiniOzoneCluster.
-   *
-   * @param builder cluster builder
-   * @param scm     StorageContainerManager, already running
-   * @throws IOException if there is an I/O error
-   */
-  private MiniOzoneClassicCluster(Builder builder, StorageContainerManager scm,
-                           KeySpaceManager ksm)
-      throws IOException {
-    super(builder);
-    this.conf = builder.conf;
-    this.scm = scm;
-    this.ksm = ksm;
-    tempPath = Paths.get(builder.getPath(), builder.getRunID());
-  }
-
-
-  @Override
-  protected void setupDatanodeAddress(
-      int i, Configuration dnConf, boolean setupHostsFile,
-      boolean checkDnAddrConf) throws IOException {
-    super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf);
-    String path = GenericTestUtils.getTempPath(
-        MiniOzoneClassicCluster.class.getSimpleName() + "datanode");
-    dnConf.setStrings(ScmConfigKeys.OZONE_SCM_DATANODE_ID,
-        path + "/" + i + "-datanode.id");
-    setConf(i, dnConf, 
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
-        getInstanceStorageDir(i, -1).getCanonicalPath());
-    String containerMetaDirs = dnConf.get(
-        OzoneConfigKeys.OZONE_METADATA_DIRS) + "-dn-" + i;
-    Path containerMetaDirPath = Paths.get(containerMetaDirs);
-    setConf(i, dnConf, OzoneConfigKeys.OZONE_METADATA_DIRS,
-        containerMetaDirs);
-    Path containerRootPath =
-        containerMetaDirPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
-    Files.createDirectories(containerRootPath);
-  }
-
-  static void setConf(int i, Configuration conf, String key, String value) {
-    conf.set(key, value);
-    LOG.info("dn{}: set {} = {}", i, key, value);
-  }
-
-  @Override
-  public void close() {
-    shutdown();
-    try {
-      FileUtils.deleteDirectory(tempPath.toFile());
-    } catch (IOException e) {
-      String errorMessage = "Cleaning up metadata directories failed." + e;
-      assertFalse(errorMessage, true);
-    }
-
-    try {
-      final String localStorage =
-          conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
-              OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
-      FileUtils.deleteDirectory(new File(localStorage));
-    } catch (IOException e) {
-      LOG.error("Cleaning up local storage failed", e);
-    }
-  }
-
-  @Override
-  public boolean restartDataNode(int i) throws IOException {
-    return restartDataNode(i, true);
-  }
-  /*
-   * Restart a particular datanode, wait for it to become active
-   */
-  @Override
-  public boolean restartDataNode(int i, boolean keepPort) throws IOException {
-    LOG.info("restarting datanode:{} keepPort:{}", i, keepPort);
-    if (keepPort) {
-      DataNodeProperties dnProp = dataNodes.get(i);
-      OzoneContainer container = getOzoneContainer(dnProp
-          .getDatanode());
-      Configuration config = dnProp.getConf();
-      int currentPort = container.getContainerServerPort();
-      config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
-      config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
-      int ratisPort = container.getRatisContainerServerPort();
-      config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
-      config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
-    }
-    boolean status =  super.restartDataNode(i, keepPort);
-
-    try {
-      this.waitActive();
-      this.waitFirstBRCompleted(0, 3000);
-      waitDatanodeOzoneReady(i);
-    } catch (TimeoutException | InterruptedException e) {
-      Thread.interrupted();
-    }
-    return status;
-  }
-
-  @Override
-  public void shutdown() {
-    super.shutdown();
-    LOG.info("Shutting down the Mini Ozone Cluster");
-
-    if (ksm != null) {
-      LOG.info("Shutting down the keySpaceManager");
-      ksm.stop();
-      ksm.join();
-    }
-
-    if (scm != null) {
-      LOG.info("Shutting down the StorageContainerManager");
-      scm.stop();
-      scm.join();
-    }
-  }
-
-  @Override
-  public StorageContainerManager getStorageContainerManager() {
-    return this.scm;
-  }
-
-  public OzoneConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public KeySpaceManager getKeySpaceManager() {
-    return this.ksm;
-  }
-
-  /**
-   * Creates an {@link OzoneRestClient} connected to this cluster's REST
-   * service. Callers take ownership of the client and must close it when done.
-   *
-   * @return OzoneRestClient connected to this cluster's REST service
-   * @throws OzoneException if Ozone encounters an error creating the client
-   */
-  @Override
-  public OzoneRestClient createOzoneRestClient() throws OzoneException {
-    Preconditions.checkState(!getDataNodes().isEmpty(),
-        "Cannot create OzoneRestClient if the cluster has no DataNodes.");
-    // An Ozone request may originate at any DataNode, so pick one at random.
-    int dnIndex = new Random().nextInt(getDataNodes().size());
-    String uri = String.format("http://127.0.0.1:%d";,
-        MiniOzoneTestHelper.getOzoneRestPort(getDataNodes().get(dnIndex)));
-    LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}",
-        dnIndex, uri, USER_AUTH);
-    try {
-      return new OzoneRestClient(uri, USER_AUTH);
-    } catch (URISyntaxException e) {
-      // We control the REST service URI, so it should never be invalid.
-      throw new IllegalStateException("Unexpected URISyntaxException", e);
-    }
-  }
-
-  /**
-   * Creates an RPC proxy connected to this cluster's StorageContainerManager
-   * for accessing container location information.  Callers take ownership of
-   * the proxy and must close it when done.
-   *
-   * @return RPC proxy for accessing container location information
-   * @throws IOException if there is an I/O error
-   */
-  @Override
-  public StorageContainerLocationProtocolClientSideTranslatorPB
-      createStorageContainerLocationClient() throws IOException {
-    long version = RPC.getProtocolVersion(
-        StorageContainerLocationProtocolPB.class);
-    InetSocketAddress address = scm.getClientRpcAddress();
-    LOG.info(
-        "Creating StorageContainerLocationProtocol RPC client with address {}",
-        address);
-    return new StorageContainerLocationProtocolClientSideTranslatorPB(
-        RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
-            address, UserGroupInformation.getCurrentUser(), conf,
-            NetUtils.getDefaultSocketFactory(conf),
-            Client.getRpcTimeout(conf)));
-  }
-
-  /**
-   * Waits for the Ozone cluster to be ready for processing requests.
-   */
-  @Override
-  public void waitOzoneReady() throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(() -> {
-      final int healthy = scm.getNodeCount(HEALTHY);
-      final boolean isReady = healthy >= numDataNodes;
-      LOG.info("{}. Got {} of {} DN Heartbeats.",
-            isReady? "Cluster is ready" : "Waiting for cluster to be ready",
-            healthy, numDataNodes);
-      return isReady;
-    }, 1000, 60 * 1000); //wait for 1 min.
-  }
-
-  /**
-   * Waits for a particular Datanode to be ready for processing ozone requests.
-   */
-  @Override
-  public void waitDatanodeOzoneReady(int dnIndex)
-      throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(() -> {
-      DatanodeStateMachine.DatanodeStates state =
-          MiniOzoneTestHelper.getStateMachine(dataNodes.get(dnIndex)
-              .getDatanode()).getContext().getState();
-      final boolean rebootComplete =
-          (state == DatanodeStateMachine.DatanodeStates.RUNNING);
-      LOG.info("{} Current state:{}", rebootComplete, state);
-      return rebootComplete;
-    }, 1000, 60 * 1000); //wait for 1 min.
-  }
-
-  /**
-   * Waits for SCM to be out of Chill Mode. Many tests can be run iff we are 
out
-   * of Chill mode.
-   *
-   * @throws TimeoutException
-   * @throws InterruptedException
-   */
-  @Override
-  public void waitTobeOutOfChillMode() throws TimeoutException,
-      InterruptedException {
-    GenericTestUtils.waitFor(() -> {
-      if (scm.getScmNodeManager().isOutOfChillMode()) {
-        return true;
-      }
-      LOG.info("Waiting for cluster to be ready. No datanodes found");
-      return false;
-    }, 100, 45000);
-  }
-
-  @Override
-  public void waitForHeartbeatProcessed() throws TimeoutException,
-      InterruptedException {
-    GenericTestUtils.waitFor(() ->
-            scm.getScmNodeManager().waitForHeartbeatProcessed(), 100,
-        4 * 1000);
-    GenericTestUtils.waitFor(() ->
-            scm.getScmNodeManager().getStats().getCapacity().get() > 0, 100,
-        4 * 1000);
-  }
-
-  /**
-   * Builder for configuring the MiniOzoneCluster to run.
-   */
-  public static class Builder
-      extends MiniDFSCluster.Builder {
-
-    private final OzoneConfiguration conf;
-    private static final int DEFAULT_HB_SECONDS = 1;
-    private static final int DEFAULT_PROCESSOR_MS = 100;
-    private final String path;
-    private final UUID runID;
-    private Optional<String> ozoneHandlerType = java.util.Optional.empty();
-    private Optional<Boolean> enableTrace = Optional.of(false);
-    private Optional<Integer> hbSeconds = Optional.empty();
-    private Optional<Integer> hbProcessorInterval = Optional.empty();
-    private Optional<String> scmMetadataDir = Optional.empty();
-    private Optional<String> clusterId = Optional.empty();
-    private Optional<String> scmId = Optional.empty();
-    private Optional<String> ksmId = Optional.empty();
-    private Boolean ozoneEnabled = true;
-    private Boolean waitForChillModeFinish = true;
-    private Boolean randomContainerPort = true;
-    // Use relative smaller number of handlers for testing
-    private int numOfKsmHandlers = 20;
-    private int numOfScmHandlers = 20;
-
-    /**
-     * Creates a new Builder.
-     *
-     * @param conf configuration
-     */
-    public Builder(OzoneConfiguration conf) {
-      super(conf);
-      // Mini Ozone cluster will not come up if the port is not true, since
-      // Ratis will exit if the server port cannot be bound. We can remove this
-      // hard coding once we fix the Ratis default behaviour.
-      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
-          true);
-      this.conf = conf;
-      path = GenericTestUtils.getTempPath(
-          MiniOzoneClassicCluster.class.getSimpleName() +
-          UUID.randomUUID().toString());
-      runID = UUID.randomUUID();
-    }
-
-    public Builder setRandomContainerPort(boolean randomPort) {
-      this.randomContainerPort = randomPort;
-      return this;
-    }
-
-    @Override
-    public Builder numDataNodes(int val) {
-      super.numDataNodes(val);
-      return this;
-    }
-
-    @Override
-    public Builder storageCapacities(long[] capacities) {
-      super.storageCapacities(capacities);
-      return this;
-    }
-
-    public Builder setHandlerType(String handler) {
-      ozoneHandlerType = Optional.of(handler);
-      return this;
-    }
-
-    public Builder setTrace(Boolean trace) {
-      enableTrace = Optional.of(trace);
-      return this;
-    }
-
-    public Builder setSCMHBInterval(int seconds) {
-      hbSeconds = Optional.of(seconds);
-      return this;
-    }
-
-    public Builder setSCMHeartbeatProcessingInterval(int milliseconds) {
-      hbProcessorInterval = Optional.of(milliseconds);
-      return this;
-    }
-
-    public Builder setSCMMetadataDir(String scmMetadataDirPath) {
-      scmMetadataDir = Optional.of(scmMetadataDirPath);
-      return this;
-    }
-
-    public Builder disableOzone() {
-      ozoneEnabled = false;
-      return this;
-    }
-
-    public Builder doNotwaitTobeOutofChillMode() {
-      waitForChillModeFinish = false;
-      return this;
-    }
-
-    public Builder setNumOfKSMHandlers(int numOfHandlers) {
-      numOfKsmHandlers = numOfHandlers;
-      return this;
-    }
-
-    public Builder setNumOfSCMHandlers(int numOfHandlers) {
-      numOfScmHandlers = numOfHandlers;
-      return this;
-    }
-
-    public Builder setClusterId(String cId) {
-      clusterId = Optional.of(cId);
-      return this;
-    }
-
-    public Builder setScmId(String sId) {
-      scmId = Optional.of(sId);
-      return this;
-    }
-
-    public Builder setKsmId(String kId) {
-      ksmId = Optional.of(kId);
-      return this;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public String getRunID() {
-      return runID.toString();
-    }
-
-    @Override
-    public MiniOzoneClassicCluster build() throws IOException {
-
-
-      configureHandler();
-      configureTrace();
-      configureSCMheartbeat();
-      configScmMetadata();
-      initializeScm();
-      initializeKSM();
-
-      conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, 
"127.0.0.1:0");
-      conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(ScmConfigKeys.HDDS_REST_HTTP_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(DFS_DATANODE_PLUGINS_KEY,
-          "org.apache.hadoop.ozone.HddsDatanodeService");
-      conf.set(HDDS_DATANODE_PLUGINS_KEY,
-          "org.apache.hadoop.ozone.web.OzoneHddsDatanodeService");
-
-      // Configure KSM and SCM handlers
-      conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
-      conf.setInt(KSMConfigKeys.OZONE_KSM_HANDLER_COUNT_KEY, numOfKsmHandlers);
-
-      // Use random ports for ozone containers in mini cluster,
-      // in order to launch multiple container servers per node.
-      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
-          randomContainerPort);
-
-      StorageContainerManager scm = StorageContainerManager.createSCM(
-          null, conf);
-      scm.start();
-
-      KeySpaceManager ksm = KeySpaceManager.createKSM(null, conf);
-      ksm.start();
-
-      String addressString =  scm.getDatanodeRpcAddress().getHostString() +
-          ":" + scm.getDatanodeRpcAddress().getPort();
-      conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, addressString);
-
-      MiniOzoneClassicCluster cluster =
-          new MiniOzoneClassicCluster(this, scm, ksm);
-      try {
-        cluster.waitOzoneReady();
-        if (waitForChillModeFinish) {
-          cluster.waitTobeOutOfChillMode();
-        }
-        cluster.waitForHeartbeatProcessed();
-      } catch (Exception e) {
-        // A workaround to propagate MiniOzoneCluster failures without
-        // changing the method signature (which would require cascading
-        // changes to hundreds of unrelated HDFS tests).
-        throw new IOException("Failed to start MiniOzoneCluster", e);
-      }
-      return cluster;
-    }
-
-    private void configScmMetadata() throws IOException {
-
-
-      if (scmMetadataDir.isPresent()) {
-        // if user specifies a path in the test, it is assumed that user takes
-        // care of creating and cleaning up that directory after the tests.
-        conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
-            scmMetadataDir.get());
-        return;
-      }
-
-      // If user has not specified a path, create a UUID for this miniCluster
-      // and create SCM under that directory.
-      Path scmPath = Paths.get(path, runID.toString(), "cont-meta");
-      Files.createDirectories(scmPath);
-      Path containerPath = scmPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
-      Files.createDirectories(containerPath);
-      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath
-          .toString());
-    }
-
-    private void initializeScm() throws IOException {
-      SCMStorage scmStore = new SCMStorage(conf);
-      if (!clusterId.isPresent()) {
-        clusterId = Optional.of(runID.toString());
-      }
-      scmStore.setClusterId(clusterId.get());
-      if (!scmId.isPresent()) {
-        scmId = Optional.of(UUID.randomUUID().toString());
-      }
-      scmStore.setScmId(scmId.get());
-      scmStore.initialize();
-    }
-
-    private void initializeKSM() throws IOException {
-      KSMStorage ksmStore = new KSMStorage(conf);
-      ksmStore.setClusterId(clusterId.get());
-      ksmStore.setScmId(scmId.get());
-      ksmStore.setKsmId(ksmId.orElse(UUID.randomUUID().toString()));
-      ksmStore.initialize();
-    }
-
-    private void configureHandler() {
-      conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
-      if (!ozoneHandlerType.isPresent()) {
-        throw new IllegalArgumentException(
-            "The Ozone handler type must be specified.");
-      } else {
-        conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-            ozoneHandlerType.get());
-      }
-    }
-
-    private void configureTrace() {
-      if (enableTrace.isPresent()) {
-        conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
-            enableTrace.get());
-        GenericTestUtils.setRootLogLevel(Level.TRACE);
-      }
-      GenericTestUtils.setRootLogLevel(Level.INFO);
-    }
-
-    private void configureSCMheartbeat() {
-      if (hbSeconds.isPresent()) {
-        conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL,
-            hbSeconds.get(), TimeUnit.SECONDS);
-
-      } else {
-        conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL,
-            DEFAULT_HB_SECONDS,
-            TimeUnit.SECONDS);
-      }
-
-      if (hbProcessorInterval.isPresent()) {
-        conf.setTimeDuration(
-            ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
-            hbProcessorInterval.get(),
-            TimeUnit.MILLISECONDS);
-      } else {
-        conf.setTimeDuration(
-            ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
-            DEFAULT_PROCESSOR_MS,
-            TimeUnit.MILLISECONDS);
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 46d59de..49f1b46 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -17,46 +17,303 @@
  */
 package org.apache.hadoop.ozone;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.StorageContainerManager;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
-import org.apache.hadoop.ozone.web.client.OzoneRestClient;
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.test.GenericTestUtils;
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 /**
  * Interface used for MiniOzoneClusters.
  */
-public interface MiniOzoneCluster extends AutoCloseable, Closeable {
-  void close();
+public interface MiniOzoneCluster {
 
-  boolean restartDataNode(int i) throws IOException;
+  /**
+   * Returns the configuration object associated with the MiniOzoneCluster.
+   *
+   * @return Configuration
+   */
+  Configuration getConf();
 
-  boolean restartDataNode(int i, boolean keepPort) throws IOException;
+  /**
+   * Waits for the cluster to be ready, this call blocks till all the
+   * configured {@link HddsDatanodeService} registers with
+   * {@link StorageContainerManager}.
+   *
+   * @throws TimeoutException In case of timeout
+   * @throws InterruptedException In case of interrupt while waiting
+   */
+  void waitForClusterToBeReady() throws TimeoutException, InterruptedException;
 
-  void shutdown();
+  /**
+   * Waits/blocks till the cluster is out of chill mode.
+   *
+   * @throws TimeoutException TimeoutException In case of timeout
+   * @throws InterruptedException In case of interrupt while waiting
+   */
+  void waitTobeOutOfChillMode() throws TimeoutException, InterruptedException;
 
+  /**
+   * Returns {@link StorageContainerManager} associated with this
+   * {@link MiniOzoneCluster} instance.
+   *
+   * @return {@link StorageContainerManager} instance
+   */
   StorageContainerManager getStorageContainerManager();
 
+  /**
+   * Returns {@link KeySpaceManager} associated with this
+   * {@link MiniOzoneCluster} instance.
+   *
+   * @return {@link KeySpaceManager} instance
+   */
   KeySpaceManager getKeySpaceManager();
 
-  OzoneRestClient createOzoneRestClient() throws OzoneException;
+  /**
+   * Returns the list of {@link HddsDatanodeService} which are part of this
+   * {@link MiniOzoneCluster} instance.
+   *
+   * @return List of {@link HddsDatanodeService}
+   */
+  List<HddsDatanodeService> getHddsDatanodes();
+
+  /**
+   * Returns an {@link OzoneClient} to access the {@link MiniOzoneCluster}.
+   *
+   * @return {@link OzoneClient}
+   * @throws IOException
+   */
+  OzoneClient getClient() throws IOException;
+
+  /**
+   * Returns an RPC based {@link OzoneClient} to access the
+   * {@link MiniOzoneCluster}.
+   *
+   * @return {@link OzoneClient}
+   * @throws IOException
+   */
+  OzoneClient getRpcClient() throws IOException;
+
+  /**
+   * Returns an REST based {@link OzoneClient} to access the
+   * {@link MiniOzoneCluster}.
+   *
+   * @return {@link OzoneClient}
+   * @throws IOException
+   */
+  OzoneClient getRestClient() throws IOException;
 
+  /**
+   * Returns StorageContainerLocationClient to communicate with
+   * {@link StorageContainerManager} associated with the MiniOzoneCluster.
+   *
+   * @return StorageContainerLocation Client
+   * @throws IOException
+   */
   StorageContainerLocationProtocolClientSideTranslatorPB
-  createStorageContainerLocationClient() throws IOException;
+  getStorageContainerLocationClient()  throws IOException;
+
+  /**
+   * Restarts StorageContainerManager instance.
+   *
+   * @throws IOException
+   */
+  void restartStorageContainerManager() throws IOException;
+
+  /**
+   * Restarts KeySpaceManager instance.
+   *
+   * @throws IOException
+   */
+  void restartKeySpaceManager() throws IOException;
+
+  /**
+   * Restart a particular HddsDatanode.
+   *
+   * @param i index of HddsDatanode in the MiniOzoneCluster
+   */
+  void restartHddsDatanode(int i);
+
+  /**
+   * Shutdown a particular HddsDatanode.
+   *
+   * @param i index of HddsDatanode in the MiniOzoneCluster
+   */
+  void shutdownHddsDatanode(int i);
+
+  /**
+   * Shutdown the MiniOzoneCluster.
+   */
+  void shutdown();
+
+  /**
+   * Returns the Builder to construct MiniOzoneCluster.
+   *
+   * @param conf OzoneConfiguration
+   *
+   * @return MiniOzoneCluster builder
+   */
+  static Builder newBuilder(OzoneConfiguration conf) {
+    return new MiniOzoneClusterImpl.Builder(conf);
+  }
+
+  abstract class Builder {
+
+    protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
+    protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
+
+    protected final OzoneConfiguration conf;
+    protected final String path;
+
+    protected String clusterId;
+
+    protected Optional<Boolean> enableTrace = Optional.of(false);
+    protected Optional<Integer> hbInterval = Optional.empty();
+    protected Optional<Integer> hbProcessorInterval = Optional.empty();
+    protected Optional<String> scmId = Optional.empty();
+    protected Optional<String> ksmId = Optional.empty();
+
+    protected Boolean ozoneEnabled = true;
+    protected Boolean randomContainerPort = true;
+
+    // Use relative smaller number of handlers for testing
+    protected int numOfKsmHandlers = 20;
+    protected int numOfScmHandlers = 20;
+    protected int numOfDatanodes = 1;
+
+    protected Builder(OzoneConfiguration conf) {
+      this.conf = conf;
+      this.clusterId = UUID.randomUUID().toString();
+      this.path = GenericTestUtils.getTempPath(
+          MiniOzoneClusterImpl.class.getSimpleName() + "-" + clusterId);
+    }
+
+    /**
+     * Sets the cluster Id.
+     *
+     * @param id cluster Id
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setClusterId(String id) {
+      clusterId = id;
+      return this;
+    }
+
+    /**
+     * Sets the SCM id.
+     *
+     * @param id SCM Id
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setScmId(String id) {
+      scmId = Optional.of(id);
+      return this;
+    }
+
+    /**
+     * Sets the KSM id.
+     *
+     * @param id KSM Id
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setKsmId(String id) {
+      ksmId = Optional.of(id);
+      return this;
+    }
+
+    /**
+     * If set to true container service will be started in a random port.
+     *
+     * @param randomPort enable random port
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setRandomContainerPort(boolean randomPort) {
+      randomContainerPort = randomPort;
+      return this;
+    }
+
+    /**
+     * Sets the number of HddsDatanodes to be started as part of
+     * MiniOzoneCluster.
+     *
+     * @param val number of datanodes
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setNumDatanodes(int val) {
+      numOfDatanodes = val;
+      return this;
+    }
+
+
+    /**
+     * Sets the number of HeartBeat Interval of Datanodes, the value should be
+     * in MilliSeconds.
+     *
+     * @param val HeartBeat interval in milliseconds
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setHbInterval(int val) {
+      hbInterval = Optional.of(val);
+      return this;
+    }
 
-  void waitOzoneReady() throws TimeoutException, InterruptedException;
+    /**
+     * Sets the number of HeartBeat Processor Interval of Datanodes,
+     * the value should be in MilliSeconds.
+     *
+     * @param val HeartBeat Processor interval in milliseconds
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setHbProcessorInterval (int val) {
+      hbProcessorInterval = Optional.of(val);
+      return this;
+    }
 
-  void waitDatanodeOzoneReady(int dnIndex)
-      throws TimeoutException, InterruptedException;
+    /**
+     * When set to true, enables trace level logging.
+     *
+     * @param trace true or false
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setTrace(Boolean trace) {
+      enableTrace = Optional.of(trace);
+      return this;
+    }
 
-  void waitTobeOutOfChillMode() throws TimeoutException,
-      InterruptedException;
+    /**
+     * Modifies the configuration such that Ozone will be disabled.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder disableOzone() {
+      ozoneEnabled = false;
+      return this;
+    }
 
-  void waitForHeartbeatProcessed() throws TimeoutException,
-      InterruptedException;
+    /**
+     * Constructs and returns MiniOzoneCluster.
+     *
+     * @return {@link MiniOzoneCluster}
+     *
+     * @throws IOException
+     */
+    public abstract MiniOzoneCluster build() throws IOException;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
new file mode 100644
index 0000000..467818b
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.ksm.KeySpaceManager;
+import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.ozone.ksm.KSMStorage;
+import org.apache.hadoop.ozone.web.client.OzoneRestClient;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .HEALTHY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
+
+/**
+ * MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
+ * running tests.  The cluster consists of a KeySpaceManager,
+ * StorageContainerManager and multiple DataNodes.
+ */
+@InterfaceAudience.Private
+public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
+
+  private final OzoneConfiguration conf;
+  private final StorageContainerManager scm;
+  private final KeySpaceManager ksm;
+  private final List<HddsDatanodeService> hddsDatanodes;
+
+  /**
+   * Creates a new MiniOzoneCluster.
+   *
+   * @throws IOException if there is an I/O error
+   */
+  private MiniOzoneClusterImpl(OzoneConfiguration conf,
+                               KeySpaceManager ksm,
+                               StorageContainerManager scm,
+                               List<HddsDatanodeService> hddsDatanodes) {
+    this.conf = conf;
+    this.ksm = ksm;
+    this.scm = scm;
+    this.hddsDatanodes = hddsDatanodes;
+  }
+
+  public OzoneConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Waits for the Ozone cluster to be ready for processing requests.
+   */
+  @Override
+  public void waitForClusterToBeReady()
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> {
+      final int healthy = scm.getNodeCount(HEALTHY);
+      final boolean isReady = healthy == hddsDatanodes.size();
+      LOG.info("{}. Got {} of {} DN Heartbeats.",
+          isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+          healthy, hddsDatanodes.size());
+      return isReady;
+    }, 1000, 60 * 1000); //wait for 1 min.
+  }
+
+  /**
+   * Waits for SCM to be out of Chill Mode. Many tests can be run iff we are 
out
+   * of Chill mode.
+   *
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  @Override
+  public void waitTobeOutOfChillMode()
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> {
+      if (scm.getScmNodeManager().isOutOfChillMode()) {
+        return true;
+      }
+      LOG.info("Waiting for cluster to be ready. No datanodes found");
+      return false;
+    }, 100, 45000);
+  }
+
+  @Override
+  public StorageContainerManager getStorageContainerManager() {
+    return this.scm;
+  }
+
+  @Override
+  public KeySpaceManager getKeySpaceManager() {
+    return this.ksm;
+  }
+
+  @Override
+  public List<HddsDatanodeService> getHddsDatanodes() {
+    return hddsDatanodes;
+  }
+
+  @Override
+  public OzoneClient getClient() throws IOException {
+    return OzoneClientFactory.getClient(conf);
+  }
+
+  @Override
+  public OzoneClient getRpcClient() throws IOException {
+    return OzoneClientFactory.getRpcClient(conf);
+  }
+
+  /**
+   * Creates an {@link OzoneRestClient} connected to this cluster's REST
+   * service. Callers take ownership of the client and must close it when done.
+   *
+   * @return OzoneRestClient connected to this cluster's REST service
+   * @throws OzoneException if Ozone encounters an error creating the client
+   */
+  @Override
+  public OzoneClient getRestClient() throws IOException {
+    return OzoneClientFactory.getRestClient(conf);
+  }
+
+  /**
+   * Returns an RPC proxy connected to this cluster's StorageContainerManager
+   * for accessing container location information.  Callers take ownership of
+   * the proxy and must close it when done.
+   *
+   * @return RPC proxy for accessing container location information
+   * @throws IOException if there is an I/O error
+   */
+  @Override
+  public StorageContainerLocationProtocolClientSideTranslatorPB
+  getStorageContainerLocationClient() throws IOException {
+    long version = RPC.getProtocolVersion(
+        StorageContainerLocationProtocolPB.class);
+    InetSocketAddress address = scm.getClientRpcAddress();
+    LOG.info(
+        "Creating StorageContainerLocationProtocol RPC client with address {}",
+        address);
+    return new StorageContainerLocationProtocolClientSideTranslatorPB(
+        RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
+            address, UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getDefaultSocketFactory(conf),
+            Client.getRpcTimeout(conf)));
+  }
+
+  @Override
+  public void restartStorageContainerManager() throws IOException {
+    scm.stop();
+    scm.start();
+  }
+
+  @Override
+  public void restartKeySpaceManager() throws IOException {
+    ksm.stop();
+    ksm.start();
+  }
+
+  @Override
+  public void restartHddsDatanode(int i) {
+    HddsDatanodeService datanodeService = hddsDatanodes.get(i);
+    datanodeService.stop();
+    datanodeService.join();
+    datanodeService.start(null);
+  }
+
+  @Override
+  public void shutdownHddsDatanode(int i) {
+    hddsDatanodes.get(i).stop();
+  }
+
+  @Override
+  public void shutdown() {
+    try {
+      LOG.info("Shutting down the Mini Ozone Cluster");
+
+      File baseDir = new File(GenericTestUtils.getTempPath(
+          MiniOzoneClusterImpl.class.getSimpleName() + "-" +
+              scm.getScmInfo().getClusterId()));
+      FileUtils.deleteDirectory(baseDir);
+
+      if (ksm != null) {
+        LOG.info("Shutting down the keySpaceManager");
+        ksm.stop();
+        ksm.join();
+      }
+
+      if (scm != null) {
+        LOG.info("Shutting down the StorageContainerManager");
+        scm.stop();
+        scm.join();
+      }
+
+      if (!hddsDatanodes.isEmpty()) {
+        LOG.info("Shutting down the HddsDatanodes");
+        for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
+          hddsDatanode.stop();
+          hddsDatanode.join();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Exception while shutting down the cluster.", e);
+    }
+  }
+
+  /**
+   * Builder for configuring the MiniOzoneCluster to run.
+   */
+  public static class Builder extends MiniOzoneCluster.Builder {
+
+    /**
+     * Creates a new Builder.
+     *
+     * @param conf configuration
+     */
+    public Builder(OzoneConfiguration conf) {
+      super(conf);
+    }
+
+    @Override
+    public MiniOzoneCluster build() throws IOException {
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      initializeConfiguration();
+      StorageContainerManager scm = createSCM();
+      scm.start();
+      KeySpaceManager ksm = createKSM();
+      ksm.start();
+      List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
+      hddsDatanodes.forEach((datanode) -> datanode.start(null));
+      return new MiniOzoneClusterImpl(conf, ksm, scm, hddsDatanodes);
+    }
+
+    /**
+     * Initializes the configureation required for starting MiniOzoneCluster.
+     *
+     * @throws IOException
+     */
+    private void initializeConfiguration() throws IOException {
+      conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, ozoneEnabled);
+      Path metaDir = Paths.get(path, "ozone-meta");
+      Files.createDirectories(metaDir);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+      configureTrace();
+    }
+
+    /**
+     * Creates a new StorageContainerManager instance.
+     *
+     * @return {@link StorageContainerManager}
+     *
+     * @throws IOException
+     */
+    private StorageContainerManager createSCM() throws IOException {
+      configureSCM();
+      SCMStorage scmStore = new SCMStorage(conf);
+      scmStore.setClusterId(clusterId);
+      if (!scmId.isPresent()) {
+        scmId = Optional.of(UUID.randomUUID().toString());
+      }
+      scmStore.setScmId(scmId.get());
+      scmStore.initialize();
+      return StorageContainerManager.createSCM(null, conf);
+    }
+
+    /**
+     * Creates a new KeySpaceManager instance.
+     *
+     * @return {@link KeySpaceManager}
+     *
+     * @throws IOException
+     */
+    private KeySpaceManager createKSM() throws IOException {
+      configureKSM();
+      KSMStorage ksmStore = new KSMStorage(conf);
+      ksmStore.setClusterId(clusterId);
+      ksmStore.setScmId(scmId.get());
+      ksmStore.setKsmId(ksmId.orElse(UUID.randomUUID().toString()));
+      ksmStore.initialize();
+      return KeySpaceManager.createKSM(null, conf);
+    }
+
+    /**
+     * Creates HddsDatanodeService(s) instance.
+     *
+     * @return List of HddsDatanodeService
+     *
+     * @throws IOException
+     */
+    private List<HddsDatanodeService> createHddsDatanodes(
+        StorageContainerManager scm) throws IOException {
+      configureHddsDatanodes();
+      String scmAddress =  scm.getDatanodeRpcAddress().getHostString() +
+          ":" + scm.getDatanodeRpcAddress().getPort();
+      conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
+      List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();
+      for (int i = 0; i < numOfDatanodes; i++) {
+        Configuration dnConf = new OzoneConfiguration(conf);
+        String datanodeBaseDir = path + "/datanode-" + Integer.toString(i);
+        Path metaDir = Paths.get(datanodeBaseDir, "meta");
+        Path dataDir = Paths.get(datanodeBaseDir, "data", "containers");
+        Path ratisDir = Paths.get(datanodeBaseDir, "data", "ratis");
+        Files.createDirectories(metaDir);
+        Files.createDirectories(dataDir);
+        Files.createDirectories(ratisDir);
+        dnConf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+        dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, 
dataDir.toString());
+        dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+            ratisDir.toString());
+
+        hddsDatanodes.add(
+            HddsDatanodeService.createHddsDatanodeService(dnConf));
+      }
+      return hddsDatanodes;
+    }
+
+    private void configureSCM() {
+      conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+      conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, 
"127.0.0.1:0");
+      conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
+      conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+      conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
+      configureSCMheartbeat();
+    }
+
+    private void configureSCMheartbeat() {
+      if (hbInterval.isPresent()) {
+        conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL,
+            hbInterval.get(), TimeUnit.MILLISECONDS);
+
+      } else {
+        conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL,
+            DEFAULT_HB_INTERVAL_MS,
+            TimeUnit.MILLISECONDS);
+      }
+
+      if (hbProcessorInterval.isPresent()) {
+        conf.setTimeDuration(
+            ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+            hbProcessorInterval.get(),
+            TimeUnit.MILLISECONDS);
+      } else {
+        conf.setTimeDuration(
+            ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+            DEFAULT_HB_PROCESSOR_INTERVAL_MS,
+            TimeUnit.MILLISECONDS);
+      }
+    }
+
+
+    private void configureKSM() {
+      conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
+      conf.set(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+      conf.setInt(KSMConfigKeys.OZONE_KSM_HANDLER_COUNT_KEY, numOfKsmHandlers);
+    }
+
+    private void configureHddsDatanodes() {
+      conf.set(ScmConfigKeys.HDDS_REST_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+      conf.set(HDDS_DATANODE_PLUGINS_KEY,
+          "org.apache.hadoop.ozone.web.OzoneHddsDatanodeService");
+      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+          randomContainerPort);
+      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+          randomContainerPort);
+    }
+
+    private void configureTrace() {
+      if (enableTrace.isPresent()) {
+        conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
+            enableTrace.get());
+        GenericTestUtils.setRootLogLevel(Level.TRACE);
+      }
+      GenericTestUtils.setRootLogLevel(Level.INFO);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneTestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneTestHelper.java
deleted file mode 100644
index 7acfefb..0000000
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneTestHelper.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.util.ServicePlugin;
-
-import java.lang.reflect.Field;
-import java.util.List;
-
-/**
- * Stateless helper functions for MiniOzone based tests.
- */
-public class MiniOzoneTestHelper {
-
-  private MiniOzoneTestHelper() {
-  }
-
-  public static DatanodeDetails getDatanodeDetails(DataNode dataNode) {
-    return findHddsPlugin(dataNode).getDatanodeDetails();
-  }
-
-  public static int getOzoneRestPort(DataNode dataNode) {
-    return MiniOzoneTestHelper.getDatanodeDetails(dataNode).getOzoneRestPort();
-  }
-
-  public static OzoneContainer getOzoneContainer(DataNode dataNode) {
-    return findHddsPlugin(dataNode).getDatanodeStateMachine()
-        .getContainer();
-  }
-
-  public static ContainerManager getOzoneContainerManager(DataNode dataNode) {
-    return findHddsPlugin(dataNode).getDatanodeStateMachine()
-        .getContainer().getContainerManager();
-
-  }
-  public static DatanodeStateMachine getStateMachine(DataNode dataNode) {
-    return findHddsPlugin(dataNode).getDatanodeStateMachine();
-  }
-
-  private static HddsDatanodeService findHddsPlugin(DataNode dataNode) {
-    try {
-      Field pluginsField = DataNode.class.getDeclaredField("plugins");
-      pluginsField.setAccessible(true);
-      List<ServicePlugin> plugins =
-          (List<ServicePlugin>) pluginsField.get(dataNode);
-
-      for (ServicePlugin plugin : plugins) {
-        if (plugin instanceof HddsDatanodeService) {
-          return (HddsDatanodeService) plugin;
-        }
-      }
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      e.printStackTrace();
-    }
-    throw new IllegalStateException("Can't find the Hdds server plugin in the"
-        + " plugin collection of datanode");
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 7d7badd..9aefe9a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -44,10 +44,10 @@ public interface RatisTestHelper {
     static final int NUM_DATANODES = 3;
 
     private final OzoneConfiguration conf;
-    private final MiniOzoneClassicCluster cluster;
+    private final MiniOzoneCluster cluster;
 
     /**
-     * Create a {@link MiniOzoneClassicCluster} for testing by setting
+     * Create a {@link MiniOzoneCluster} for testing by setting
      *   OZONE_ENABLED = true,
      *   RATIS_ENABLED = true, and
      *   OZONE_HANDLER_TYPE_KEY = "distributed".
@@ -61,14 +61,10 @@ public interface RatisTestHelper {
       return conf;
     }
 
-    public MiniOzoneClassicCluster getCluster() {
+    public MiniOzoneCluster getCluster() {
       return cluster;
     }
 
-    public int getDatanodeInfoPort() {
-      return cluster.getDataNodes().get(0).getInfoPort();
-    }
-
     public OzoneRestClient newOzoneRestClient()
         throws OzoneException, URISyntaxException {
       return RatisTestHelper.newOzoneRestClient(getDatanodeOzoneRestPort());
@@ -76,12 +72,12 @@ public interface RatisTestHelper {
 
     @Override
     public void close() {
-      cluster.close();
+      cluster.shutdown();
     }
 
     public int getDatanodeOzoneRestPort() {
-      return MiniOzoneTestHelper.getOzoneRestPort(
-          cluster.getDataNodes().get(0));
+      return cluster.getHddsDatanodes().get(0).getDatanodeDetails()
+          .getOzoneRestPort();
     }
   }
 
@@ -100,12 +96,10 @@ public interface RatisTestHelper {
         + " = " + rpc.name());
   }
 
-  static MiniOzoneClassicCluster newMiniOzoneCluster(
+  static MiniOzoneCluster newMiniOzoneCluster(
       int numDatanodes, OzoneConfiguration conf) throws IOException {
-    final MiniOzoneClassicCluster cluster =
-        new MiniOzoneClassicCluster.Builder(conf)
-        .numDataNodes(numDatanodes)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    final MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(numDatanodes).build();
     return cluster;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index af7d1b8..20579fd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -49,22 +49,19 @@ public class TestContainerOperations {
   @BeforeClass
   public static void setup() throws Exception {
     int containerSizeGB = 5;
-    long datanodeCapacities = 3 * OzoneConsts.TB;
     ContainerOperationClient.setContainerSizeB(
         containerSizeGB * OzoneConsts.GB);
     ozoneConf = new OzoneConfiguration();
     ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
-    cluster = new MiniOzoneClassicCluster.Builder(ozoneConf).numDataNodes(1)
-        .storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    cluster = 
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
     StorageContainerLocationProtocolClientSideTranslatorPB client =
-        cluster.createStorageContainerLocationClient();
+        cluster.getStorageContainerLocationClient();
     RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class,
         ProtobufRpcEngine.class);
     storageClient = new ContainerOperationClient(
         client, new XceiverClientManager(ozoneConf));
-    cluster.waitForHeartbeatProcessed();
+    cluster.waitForClusterToBeReady();
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 19ac0e3..6755e34 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -34,7 +33,6 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.TestGenericTestUtils;
-import org.apache.hadoop.util.ServicePlugin;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -56,7 +54,7 @@ import static org.junit.Assert.*;
  */
 public class TestMiniOzoneCluster {
 
-  private static MiniOzoneClassicCluster cluster;
+  private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf;
 
   private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
@@ -79,24 +77,22 @@ public class TestMiniOzoneCluster {
   public static void cleanup() {
     if (cluster != null) {
       cluster.shutdown();
-      cluster.close();
     }
   }
 
   @Test(timeout = 30000)
   public void testStartMultipleDatanodes() throws Exception {
     final int numberOfNodes = 3;
-    cluster = new MiniOzoneClassicCluster.Builder(conf)
-        .numDataNodes(numberOfNodes)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(numberOfNodes)
         .build();
-    List<DataNode> datanodes = cluster.getDataNodes();
+    cluster.waitForClusterToBeReady();
+    List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
     assertEquals(numberOfNodes, datanodes.size());
-    for(DataNode dn : datanodes) {
+    for(HddsDatanodeService dn : datanodes) {
       // Create a single member pipe line
       String containerName = OzoneUtils.getRequestID();
-      DatanodeDetails datanodeDetails =
-          MiniOzoneTestHelper.getDatanodeDetails(dn);
+      DatanodeDetails datanodeDetails = dn.getDatanodeDetails();
       final PipelineChannel pipelineChannel =
           new PipelineChannel(datanodeDetails.getUuidString(),
               HddsProtos.LifeCycleState.OPEN,
@@ -133,15 +129,6 @@ public class TestMiniOzoneCluster {
     assertEquals(id1, validId);
     assertEquals(id1.getProtoBufMessage(), validId.getProtoBufMessage());
 
-    // Write should fail if unable to create file or directory
-    File invalidPath = new File(WRITE_TMP, "an/invalid/path");
-    try {
-      ContainerUtils.writeDatanodeDetailsTo(id1, invalidPath);
-      Assert.fail();
-    } catch (Exception e) {
-      assertTrue(e instanceof IOException);
-    }
-
     // Read should return an empty value if file doesn't exist
     File nonExistFile = new File(READ_TMP, "non_exist.id");
     nonExistFile.delete();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 3fa02e4..fa307c9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -37,7 +37,6 @@ import 
org.apache.hadoop.hdds.scm.StorageContainerManager.StartupOption;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -62,7 +61,6 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 
-import org.apache.hadoop.io.IOUtils;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -87,7 +85,7 @@ public class TestStorageContainerManager {
   public ExpectedException exception = ExpectedException.none();
 
   @Test
-  public void testRpcPermission() throws IOException {
+  public void testRpcPermission() throws Exception {
     // Test with default configuration
     OzoneConfiguration defaultConf = new OzoneConfiguration();
     testRpcPermissionWithConf(defaultConf, "unknownUser", true);
@@ -104,11 +102,9 @@ public class TestStorageContainerManager {
 
   private void testRpcPermissionWithConf(
       OzoneConfiguration ozoneConf, String fakeRemoteUsername,
-      boolean expectPermissionDenied) throws IOException {
-    MiniOzoneCluster cluster =
-        new MiniOzoneClassicCluster.Builder(ozoneConf).numDataNodes(1)
-            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-
+      boolean expectPermissionDenied) throws Exception {
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(ozoneConf).build();
+    cluster.waitForClusterToBeReady();
     try {
       String fakeUser = fakeRemoteUsername;
       StorageContainerManager mockScm = Mockito.spy(
@@ -172,7 +168,7 @@ public class TestStorageContainerManager {
         }
       }
     } finally {
-      IOUtils.cleanupWithLogger(null, cluster);
+      cluster.shutdown();
     }
   }
 
@@ -201,9 +197,8 @@ public class TestStorageContainerManager {
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
         numKeys);
 
-    MiniOzoneClassicCluster cluster =
-        new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
-            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
 
     try {
       DeletedBlockLog delLog = cluster.getStorageContainerManager()
@@ -269,19 +264,17 @@ public class TestStorageContainerManager {
   public void testBlockDeletingThrottling() throws Exception {
     int numKeys = 15;
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL, 5,
-        TimeUnit.SECONDS);
-    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
-        3000, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
     conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
         1000, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
         numKeys);
 
-    MiniOzoneClassicCluster cluster = new MiniOzoneClassicCluster.Builder(conf)
-        .numDataNodes(1).setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setHbInterval(5000)
+        .setHbProcessorInterval(3000)
         .build();
+    cluster.waitForClusterToBeReady();
 
     DeletedBlockLog delLog = cluster.getStorageContainerManager()
         .getScmBlockManager().getDeletedBlockLog();
@@ -402,14 +395,15 @@ public class TestStorageContainerManager {
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     //This will set the cluster id in the version file
     MiniOzoneCluster cluster =
-        new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
-            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
     StartupOption.INIT.setClusterId("testClusterId");
     // This will initialize SCM
     StorageContainerManager.scmInit(conf);
     SCMStorage scmStore = new SCMStorage(conf);
     Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
     Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
+    cluster.shutdown();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index dff303a..7005ea0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -55,11 +54,11 @@ import java.util.Set;
  */
 public class TestStorageContainerManagerHelper {
 
-  private final MiniOzoneClassicCluster cluster;
+  private final MiniOzoneCluster cluster;
   private final Configuration conf;
   private final StorageHandler storageHandler;
 
-  public TestStorageContainerManagerHelper(MiniOzoneClassicCluster cluster,
+  public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
       Configuration conf) throws IOException {
     this.cluster = cluster;
     this.conf = conf;
@@ -169,10 +168,9 @@ public class TestStorageContainerManagerHelper {
 
   private OzoneContainer getContainerServerByDatanodeUuid(String dnUUID)
       throws IOException {
-    for (DataNode dn : cluster.getDataNodes()) {
-      if (MiniOzoneTestHelper.getDatanodeDetails(dn).getUuidString()
-          .equals(dnUUID)) {
-        return MiniOzoneTestHelper.getOzoneContainer(dn);
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      if (dn.getDatanodeDetails().getUuidString().equals(dnUUID)) {
+        return dn.getDatanodeStateMachine().getContainer();
       }
     }
     throw new IOException("Unable to get the ozone container "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
index cc72a79..a94ee6c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.client.rest;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -58,7 +58,7 @@ public class TestOzoneRestClient {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private static MiniOzoneClassicCluster cluster = null;
+  private static MiniOzoneCluster cluster = null;
   private static OzoneClient ozClient = null;
   private static ObjectStore store = null;
 
@@ -75,9 +75,8 @@ public class TestOzoneRestClient {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
     InetSocketAddress ksmHttpAddress = cluster.getKeySpaceManager()
         .getHttpServer().getHttpAddress();
     ozClient = OzoneClientFactory.getRestClient(ksmHttpAddress.getHostName(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 59eb7cf..32a70a2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.client.rpc;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -70,7 +70,7 @@ public class TestOzoneRpcClient {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private static MiniOzoneClassicCluster cluster = null;
+  private static MiniOzoneCluster cluster = null;
   private static OzoneClient ozClient = null;
   private static ObjectStore store = null;
   private static KeySpaceManager keySpaceManager;
@@ -91,12 +91,12 @@ public class TestOzoneRpcClient {
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
-    cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(10)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).build();
+    cluster.waitForClusterToBeReady();
     ozClient = OzoneClientFactory.getRpcClient(conf);
     store = ozClient.getObjectStore();
     storageContainerLocationClient =
-        cluster.createStorageContainerLocationClient();
+        cluster.getStorageContainerLocationClient();
     keySpaceManager = cluster.getKeySpaceManager();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06d228a3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index f93dbc8..0034e8e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -19,9 +19,7 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
-import org.apache.hadoop.ozone.MiniOzoneTestHelper;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -55,10 +53,9 @@ public class TestCloseContainerHandler {
     //setup a cluster (1G free space is enough for a unit test)
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1");
-    MiniOzoneClassicCluster cluster =
-        new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
-            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-    cluster.waitOzoneReady();
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
 
     //the easiest way to create an open container is creating a key
     OzoneClient client = OzoneClientFactory.getClient(conf);
@@ -86,8 +83,8 @@ public class TestCloseContainerHandler {
 
     Assert.assertFalse(isContainerClosed(cluster, containerName));
 
-    DatanodeDetails datanodeDetails = MiniOzoneTestHelper
-        .getDatanodeDetails(cluster.getDataNodes().get(0));
+    DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
+        .getDatanodeDetails();
     //send the order to close the container
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(),
@@ -101,12 +98,13 @@ public class TestCloseContainerHandler {
     Assert.assertTrue(isContainerClosed(cluster, containerName));
   }
 
-  private Boolean isContainerClosed(MiniOzoneClassicCluster cluster,
+  private Boolean isContainerClosed(MiniOzoneCluster cluster,
       String containerName) {
     ContainerData containerData;
     try {
-      containerData = MiniOzoneTestHelper.getOzoneContainerManager(cluster
-          .getDataNodes().get(0)).readContainer(containerName);
+      containerData = cluster.getHddsDatanodes().get(0)
+          .getDatanodeStateMachine().getContainer().getContainerManager()
+          .readContainer(containerName);
       return !containerData.isOpen();
     } catch (StorageContainerException e) {
       throw new AssertionError(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to