This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bcfb64a  HDDS-3231. Cleanup KeyManagerImpl (#2961)
bcfb64a is described below

commit bcfb64ae588c896fd540abcfb860cc9ddb9a2523
Author: GeorgeJahad <[email protected]>
AuthorDate: Tue Jan 11 21:24:01 2022 -0800

    HDDS-3231. Cleanup KeyManagerImpl (#2961)
---
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 361 ++++----
 .../org/apache/hadoop/ozone/om/KeyManager.java     | 111 ---
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 912 ---------------------
 .../apache/hadoop/ozone/om/fs/OzoneManagerFS.java  |   6 -
 .../org/apache/hadoop/ozone/om/OmTestManagers.java | 122 +++
 .../hadoop/ozone/om/TestKeyDeletingService.java    |  75 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  56 +-
 .../apache/hadoop/ozone/om/TestTrashService.java   |  34 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |  77 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |  44 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |  41 +-
 .../ozone/genesis/BenchMarkOzoneManager.java       | 193 -----
 .../org/apache/hadoop/ozone/genesis/Genesis.java   |   3 +-
 13 files changed, 488 insertions(+), 1547 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 1af0b43..2ddac31 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -67,7 +68,6 @@ import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -83,6 +83,7 @@ import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
@@ -152,7 +153,11 @@ public class TestKeyManagerImpl {
   private static long scmBlockSize;
   private static final String KEY_NAME = "key1";
   private static final String BUCKET_NAME = "bucket1";
+  private static final String VERSIONED_BUCKET_NAME = "versionedBucket1";
   private static final String VOLUME_NAME = "vol1";
+  private static OzoneManagerProtocol writeClient;
+  private static OzoneManager om;
+
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
@@ -164,7 +169,6 @@ public class TestKeyManagerImpl {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
     conf.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true");
     mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class);
-    metadataManager = new OmMetadataManagerImpl(conf);
     nodeManager = new MockNodeManager(true, 10);
     NodeSchema[] schemas = new NodeSchema[]
         {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
@@ -191,10 +195,17 @@ public class TestKeyManagerImpl {
 
     mockScmContainerClient =
         Mockito.mock(StorageContainerLocationProtocol.class);
-    keyManager =
-        new KeyManagerImpl(scm.getBlockProtocolServer(),
-            mockScmContainerClient, metadataManager, conf, "om1", null);
-    prefixManager = new PrefixManagerImpl(metadataManager, false);
+    
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf, scm.getBlockProtocolServer(),
+        mockScmContainerClient);
+    om = omTestManagers.getOzoneManager();
+    metadataManager = omTestManagers.getMetadataManager();
+    keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
+    prefixManager = omTestManagers.getPrefixManager();
+    writeClient = omTestManagers.getWriteClient();
+
+    mockContainerClient();
 
     Mockito.when(mockScmBlockLocationProtocol
         .allocateBlock(Mockito.anyLong(), Mockito.anyInt(),
@@ -204,39 +215,57 @@ public class TestKeyManagerImpl {
                 new SCMException("SafeModePrecheck failed for allocateBlock",
             ResultCodes.SAFE_MODE_EXCEPTION));
     createVolume(VOLUME_NAME);
-    createBucket(VOLUME_NAME, BUCKET_NAME);
+    createBucket(VOLUME_NAME, BUCKET_NAME, false);
+    createBucket(VOLUME_NAME, VERSIONED_BUCKET_NAME, true);
   }
 
   @AfterClass
   public static void cleanup() throws Exception {
     scm.stop();
     scm.join();
-    metadataManager.stop();
-    keyManager.stop();
+    om.stop();
     FileUtils.deleteDirectory(dir);
   }
 
   @After
   public void cleanupTest() throws IOException {
+    mockContainerClient();
     List<OzoneFileStatus> fileStatuses = keyManager
         .listStatus(createBuilder().setKeyName("").build(), true, "", 100000);
     for (OzoneFileStatus fileStatus : fileStatuses) {
       if (fileStatus.isFile()) {
-        keyManager.deleteKey(
+        writeClient.deleteKey(
             createKeyArgs(fileStatus.getKeyInfo().getKeyName()));
       } else {
-        keyManager.deleteKey(createKeyArgs(OzoneFSUtils
+        writeClient.deleteKey(createKeyArgs(OzoneFSUtils
             .addTrailingSlashIfNeeded(
                 fileStatus.getKeyInfo().getKeyName())));
       }
     }
   }
 
-  private static void createBucket(String volumeName, String bucketName)
+  private static void mockContainerClient() {
+    ScmClient scmClient = new ScmClient(scm.getBlockProtocolServer(),
+        mockScmContainerClient);
+    HddsWhiteboxTestUtils.setInternalState(keyManager,
+        "scmClient", scmClient);
+    HddsWhiteboxTestUtils.setInternalState(om,
+        "scmClient", scmClient);
+  }
+  private static void mockBlockClient() {
+    ScmClient scmClient = new ScmClient(mockScmBlockLocationProtocol, null);
+    HddsWhiteboxTestUtils.setInternalState(keyManager,
+        "scmClient", scmClient);
+    HddsWhiteboxTestUtils.setInternalState(om,
+        "scmClient", scmClient);
+  }
+  private static void createBucket(String volumeName, String bucketName,
+                                   boolean isVersionEnabled)
       throws IOException {
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
+        .setIsVersionEnabled(isVersionEnabled)
         .build();
 
     TestOMRequestUtils.addBucketToOM(metadataManager, bucketInfo);
@@ -253,8 +282,7 @@ public class TestKeyManagerImpl {
 
   @Test
   public void allocateBlockFailureInSafeMode() throws Exception {
-    KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
-        metadataManager, conf, "om1", null);
+    mockBlockClient();
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
         .build();
@@ -278,7 +306,7 @@ public class TestKeyManagerImpl {
         omKeyInfo);
     LambdaTestUtils.intercept(OMException.class,
         "SafeModePrecheck failed for allocateBlock", () -> {
-          keyManager1
+          writeClient
               .allocateBlock(keyArgs, 1L, new ExcludeList());
         });
   }
@@ -286,8 +314,7 @@ public class TestKeyManagerImpl {
   @Test
   public void openKeyFailureInSafeMode() throws Exception {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
-        metadataManager, conf, "om1", null);
+    mockBlockClient();
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
         .setDataSize(1000)
@@ -297,7 +324,7 @@ public class TestKeyManagerImpl {
         .build();
     LambdaTestUtils.intercept(OMException.class,
         "SafeModePrecheck failed for allocateBlock", () -> {
-          keyManager1.openKey(keyArgs);
+          writeClient.openKey(keyArgs);
         });
   }
 
@@ -307,7 +334,7 @@ public class TestKeyManagerImpl {
         .setKeyName(UUID.randomUUID().toString())
         .setDataSize(scmBlockSize * 10)
         .build();
-    OpenKeySession keySession = keyManager.openKey(keyArgs);
+    OpenKeySession keySession = writeClient.openKey(keyArgs);
     OmKeyInfo keyInfo = keySession.getKeyInfo();
     Assert.assertEquals(10,
         keyInfo.getLatestVersionLocations().getLocationList().size());
@@ -325,7 +352,7 @@ public class TestKeyManagerImpl {
       keyNameBuf.append("/").append(RandomStringUtils.randomAlphabetic(5));
     }
     String keyName = keyNameBuf.toString();
-    keyManager.createDirectory(keyArgs);
+    writeClient.createDirectory(keyArgs);
     Path path = Paths.get(keyName);
     while (path != null) {
       // verify parent directories are created
@@ -338,32 +365,24 @@ public class TestKeyManagerImpl {
     keyArgs = createBuilder()
         .setKeyName(keyName)
         .build();
-    OpenKeySession keySession = keyManager.openKey(keyArgs);
+    OpenKeySession keySession = writeClient.openKey(keyArgs);
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
     try {
-      keyManager.createDirectory(keyArgs);
+      writeClient.createDirectory(keyArgs);
       Assert.fail("Creation should fail for directory.");
     } catch (OMException e) {
       Assert.assertEquals(e.getResult(),
           OMException.ResultCodes.FILE_ALREADY_EXISTS);
     }
 
-    // create directory for root directory
-    keyName = "";
-    keyArgs = createBuilder()
-        .setKeyName(keyName)
-        .build();
-    keyManager.createDirectory(keyArgs);
-    Assert.assertTrue(keyManager.getFileStatus(keyArgs).isDirectory());
-
     // create directory where parent is root
     keyName = RandomStringUtils.randomAlphabetic(5);
     keyArgs = createBuilder()
         .setKeyName(keyName)
         .build();
-    keyManager.createDirectory(keyArgs);
+    writeClient.createDirectory(keyArgs);
     OzoneFileStatus fileStatus = keyManager.getFileStatus(keyArgs);
     Assert.assertTrue(fileStatus.isDirectory());
     Assert.assertTrue(fileStatus.getKeyInfo().getKeyLocationVersions().get(0)
@@ -377,14 +396,14 @@ public class TestKeyManagerImpl {
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName(keyName)
         .build();
-    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    OpenKeySession keySession = writeClient.createFile(keyArgs, false, false);
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
 
     // try to open created key with overWrite flag set to false
     try {
-      keyManager.createFile(keyArgs, false, false);
+      writeClient.createFile(keyArgs, false, false);
       Assert.fail("Open key should fail for non overwrite create");
     } catch (OMException ex) {
       if (ex.getResult() != OMException.ResultCodes.FILE_ALREADY_EXISTS) {
@@ -393,7 +412,7 @@ public class TestKeyManagerImpl {
     }
 
     // create file should pass with overwrite flag set to true
-    keyManager.createFile(keyArgs, true, false);
+    writeClient.createFile(keyArgs, true, false);
 
     // try to create a file where parent directories do not exist and
     // recursive flag is set to false
@@ -407,7 +426,7 @@ public class TestKeyManagerImpl {
         .setKeyName(keyName)
         .build();
     try {
-      keyManager.createFile(keyArgs, false, false);
+      writeClient.createFile(keyArgs, false, false);
       Assert.fail("Open file should fail for non recursive write");
     } catch (OMException ex) {
       if (ex.getResult() != OMException.ResultCodes.DIRECTORY_NOT_FOUND) {
@@ -416,10 +435,10 @@ public class TestKeyManagerImpl {
     }
 
     // file create should pass when recursive flag is set to true
-    keySession = keyManager.createFile(keyArgs, false, true);
+    keySession = writeClient.createFile(keyArgs, false, true);
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
     Assert.assertTrue(keyManager
         .getFileStatus(keyArgs).isFile());
 
@@ -428,7 +447,7 @@ public class TestKeyManagerImpl {
         .setKeyName("")
         .build();
     try {
-      keyManager.createFile(keyArgs, true, true);
+      writeClient.createFile(keyArgs, true, true);
       Assert.fail("Open file should fail for non recursive write");
     } catch (OMException ex) {
       if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) {
@@ -442,10 +461,10 @@ public class TestKeyManagerImpl {
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName("testdir/deep/NOTICE.txt")
         .build();
-    OpenKeySession keySession = keyManager.createFile(keyArgs, false, true);
+    OpenKeySession keySession = writeClient.createFile(keyArgs, false, true);
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
 
     OzoneObj fileKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
         .setStoreType(OzoneObj.StoreType.OZONE)
@@ -453,11 +472,6 @@ public class TestKeyManagerImpl {
     RequestContext context = currentUserReads();
     Assert.assertTrue(keyManager.checkAccess(fileKey, context));
 
-    OzoneObj parentDirKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
-        .setStoreType(OzoneObj.StoreType.OZONE)
-        .setKeyName("testdir")
-        .build();
-    Assert.assertTrue(keyManager.checkAccess(parentDirKey, context));
   }
 
   @Test
@@ -477,7 +491,7 @@ public class TestKeyManagerImpl {
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName("some/dir")
         .build();
-    keyManager.createDirectory(keyArgs);
+    writeClient.createDirectory(keyArgs);
 
     OzoneObj dirKey = OzoneObjInfo.Builder.fromKeyArgs(keyArgs)
         .setStoreType(OzoneObj.StoreType.OZONE)
@@ -501,9 +515,9 @@ public class TestKeyManagerImpl {
 
     OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
         ACLType.READ, ACCESS);
-    prefixManager.addAcl(ozPrefix1, ozAcl1);
+    writeClient.addAcl(ozPrefix1, ozAcl1);
 
-    List<OzoneAcl> ozAclGet = prefixManager.getAcl(ozPrefix1);
+    List<OzoneAcl> ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(1, ozAclGet.size());
     Assert.assertEquals(ozAcl1, ozAclGet.get(0));
 
@@ -530,8 +544,8 @@ public class TestKeyManagerImpl {
     acls.add(ozAcl2);
     acls.add(ozAcl3);
 
-    prefixManager.setAcl(ozPrefix1, acls);
-    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    writeClient.setAcl(ozPrefix1, acls);
+    ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(2, ozAclGet.size());
 
     int matchEntries = 0;
@@ -547,27 +561,27 @@ public class TestKeyManagerImpl {
     }
     Assert.assertEquals(2, matchEntries);
 
-    boolean result = prefixManager.removeAcl(ozPrefix1, ozAcl4);
+    boolean result = writeClient.removeAcl(ozPrefix1, ozAcl4);
     Assert.assertEquals(true, result);
 
-    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(2, ozAclGet.size());
 
-    result = prefixManager.removeAcl(ozPrefix1, ozAcl3);
+    result = writeClient.removeAcl(ozPrefix1, ozAcl3);
     Assert.assertEquals(true, result);
-    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(1, ozAclGet.size());
 
     Assert.assertEquals(ozAcl2, ozAclGet.get(0));
 
     // add dev:w
-    prefixManager.addAcl(ozPrefix1, ozAcl4);
-    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    writeClient.addAcl(ozPrefix1, ozAcl4);
+    ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(2, ozAclGet.size());
 
     // add dev:r and validate the acl bitset combined
-    prefixManager.addAcl(ozPrefix1, ozAcl5);
-    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    writeClient.addAcl(ozPrefix1, ozAcl5);
+    ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(2, ozAclGet.size());
 
     matchEntries = 0;
@@ -708,10 +722,10 @@ public class TestKeyManagerImpl {
     }
 
     // create a file
-    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    OpenKeySession keySession = writeClient.createFile(keyArgs, false, false);
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
     Assert.assertEquals(keyManager.lookupFile(keyArgs, null).getKeyName(),
         keyName);
 
@@ -752,7 +766,7 @@ public class TestKeyManagerImpl {
     }
 
     // create a key
-    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    OpenKeySession keySession = writeClient.createFile(keyArgs, false, false);
     // randomly select 3 datanodes
     List<DatanodeDetails> nodeList = new ArrayList<>();
     nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
@@ -776,7 +790,7 @@ public class TestKeyManagerImpl {
                 locationList.get(0).getLocalID())).build());
     keyArgs.setLocationInfoList(locationInfoList);
 
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
     ContainerInfo containerInfo = new 
ContainerInfo.Builder().setContainerID(1L)
         .setPipelineID(pipeline.getId()).build();
     List<ContainerWithPipeline> containerWithPipelines = Arrays.asList(
@@ -822,7 +836,7 @@ public class TestKeyManagerImpl {
   @Test
   public void testLatestLocationVersion() throws IOException {
     String keyName = RandomStringUtils.randomAlphabetic(5);
-    OmKeyArgs keyArgs = createBuilder()
+    OmKeyArgs keyArgs = createBuilder(VERSIONED_BUCKET_NAME)
         .setKeyName(keyName)
         .setLatestVersionLocation(true)
         .build();
@@ -838,7 +852,7 @@ public class TestKeyManagerImpl {
     }
 
     // create a key
-    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    OpenKeySession keySession = writeClient.createFile(keyArgs, false, false);
     // randomly select 3 datanodes
     List<DatanodeDetails> nodeList = new ArrayList<>();
     nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
@@ -862,12 +876,20 @@ public class TestKeyManagerImpl {
                 locationList.get(0).getLocalID())).build());
     keyArgs.setLocationInfoList(locationInfoList);
 
-    keyManager.commitKey(keyArgs, keySession.getId());
+    writeClient.commitKey(keyArgs, keySession.getId());
+    // Mock out the pipelines from the SCM
+    ContainerInfo containerInfo = new 
ContainerInfo.Builder().setContainerID(1L)
+        .setPipelineID(pipeline.getId()).build();
+    List<ContainerWithPipeline> containerWithPipelines = Arrays.asList(
+        new ContainerWithPipeline(containerInfo, pipeline));
+    when(mockScmContainerClient.getContainerWithPipelineBatch(
+        Arrays.asList(1L))).thenReturn(containerWithPipelines);
+
     OmKeyInfo key = keyManager.lookupKey(keyArgs, null);
     Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
 
-    keySession = keyManager.createFile(keyArgs, true, true);
-    keyManager.commitKey(keyArgs, keySession.getId());
+    keySession = writeClient.createFile(keyArgs, true, true);
+    writeClient.commitKey(keyArgs, keySession.getId());
 
     // Test lookupKey (latestLocationVersion == true)
     key = keyManager.lookupKey(keyArgs, null);
@@ -889,7 +911,7 @@ public class TestKeyManagerImpl {
     key = keyManager.lookupFile(keyArgs, null);
     Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
 
-    keyArgs = createBuilder()
+    keyArgs = createBuilder(VERSIONED_BUCKET_NAME)
         .setKeyName(keyName)
         .setLatestVersionLocation(false)
         .build();
@@ -972,17 +994,17 @@ public class TestKeyManagerImpl {
     String keyNameDir1 = "dir1";
     OmKeyArgs keyArgsDir1 =
         createBuilder().setKeyName(keyNameDir1).build();
-    keyManager.createDirectory(keyArgsDir1);
+    writeClient.createDirectory(keyArgsDir1);
 
     String keyNameDir1Subdir1 = "dir1" + OZONE_URI_DELIMITER + "subdir1";
     OmKeyArgs keyArgsDir1Subdir1 =
         createBuilder().setKeyName(keyNameDir1Subdir1).build();
-    keyManager.createDirectory(keyArgsDir1Subdir1);
+    writeClient.createDirectory(keyArgsDir1Subdir1);
 
     String keyNameDir2 = "dir2";
     OmKeyArgs keyArgsDir2 =
         createBuilder().setKeyName(keyNameDir2).build();
-    keyManager.createDirectory(keyArgsDir2);
+    writeClient.createDirectory(keyArgsDir2);
 
     OmKeyArgs rootDirArgs = createKeyArgs("");
     // Test listStatus with recursive=false, should only have dirs under root
@@ -1238,123 +1260,113 @@ public class TestKeyManagerImpl {
   @Test
   public void testRefreshPipeline() throws Exception {
 
-    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).build();
-    try {
-      cluster.waitForClusterToBeReady();
-      OzoneManager ozoneManager = cluster.getOzoneManager();
+    OzoneManager ozoneManager = om;
 
-      StorageContainerLocationProtocol sclProtocolMock = mock(
-          StorageContainerLocationProtocol.class);
+    StorageContainerLocationProtocol sclProtocolMock = mock(
+        StorageContainerLocationProtocol.class);
 
-      List<Long> containerIDs = new ArrayList<>();
-      containerIDs.add(100L);
-      containerIDs.add(200L);
+    List<Long> containerIDs = new ArrayList<>();
+    containerIDs.add(100L);
+    containerIDs.add(200L);
 
-      List<ContainerWithPipeline> cps = new ArrayList<>();
-      for (Long containerID : containerIDs) {
-        ContainerWithPipeline containerWithPipelineMock =
-            mock(ContainerWithPipeline.class);
-        when(containerWithPipelineMock.getPipeline())
-            .thenReturn(getRandomPipeline());
+    List<ContainerWithPipeline> cps = new ArrayList<>();
+    for (Long containerID : containerIDs) {
+      ContainerWithPipeline containerWithPipelineMock =
+          mock(ContainerWithPipeline.class);
+      when(containerWithPipelineMock.getPipeline())
+          .thenReturn(getRandomPipeline());
 
-        ContainerInfo ci = mock(ContainerInfo.class);
-        when(ci.getContainerID()).thenReturn(containerID);
-        when(containerWithPipelineMock.getContainerInfo()).thenReturn(ci);
+      ContainerInfo ci = mock(ContainerInfo.class);
+      when(ci.getContainerID()).thenReturn(containerID);
+      when(containerWithPipelineMock.getContainerInfo()).thenReturn(ci);
 
-        cps.add(containerWithPipelineMock);
-      }
+      cps.add(containerWithPipelineMock);
+    }
 
-      when(sclProtocolMock.getContainerWithPipelineBatch(containerIDs))
-          .thenReturn(cps);
+    when(sclProtocolMock.getContainerWithPipelineBatch(containerIDs))
+        .thenReturn(cps);
 
-      ScmClient scmClientMock = mock(ScmClient.class);
-      when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
+    ScmClient scmClientMock = mock(ScmClient.class);
+    when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
 
-      OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo("v1",
-          "b1", "k1", ReplicationType.RATIS,
-          ReplicationFactor.THREE);
+    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo("v1",
+        "b1", "k1", ReplicationType.RATIS,
+        ReplicationFactor.THREE);
 
-      // Add block to key.
-      List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
-      Pipeline pipeline = getRandomPipeline();
+    // Add block to key.
+    List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
+    Pipeline pipeline = getRandomPipeline();
 
-      OmKeyLocationInfo omKeyLocationInfo =
-          new OmKeyLocationInfo.Builder().setBlockID(
-              new BlockID(100L, 1000L))
-              .setOffset(0).setLength(100L).setPipeline(pipeline).build();
+    OmKeyLocationInfo omKeyLocationInfo =
+        new OmKeyLocationInfo.Builder().setBlockID(
+            new BlockID(100L, 1000L))
+            .setOffset(0).setLength(100L).setPipeline(pipeline).build();
 
-      omKeyLocationInfoList.add(omKeyLocationInfo);
+    omKeyLocationInfoList.add(omKeyLocationInfo);
 
-      OmKeyLocationInfo omKeyLocationInfo2 =
-          new OmKeyLocationInfo.Builder().setBlockID(
-              new BlockID(200L, 1000L))
-              .setOffset(0).setLength(100L).setPipeline(pipeline).build();
-      omKeyLocationInfoList.add(omKeyLocationInfo2);
+    OmKeyLocationInfo omKeyLocationInfo2 =
+        new OmKeyLocationInfo.Builder().setBlockID(
+            new BlockID(200L, 1000L))
+            .setOffset(0).setLength(100L).setPipeline(pipeline).build();
+    omKeyLocationInfoList.add(omKeyLocationInfo2);
 
-      OmKeyLocationInfo omKeyLocationInfo3 =
-          new OmKeyLocationInfo.Builder().setBlockID(
-              new BlockID(100L, 2000L))
-              .setOffset(0).setLength(100L).setPipeline(pipeline).build();
-      omKeyLocationInfoList.add(omKeyLocationInfo3);
+    OmKeyLocationInfo omKeyLocationInfo3 =
+        new OmKeyLocationInfo.Builder().setBlockID(
+            new BlockID(100L, 2000L))
+            .setOffset(0).setLength(100L).setPipeline(pipeline).build();
+    omKeyLocationInfoList.add(omKeyLocationInfo3);
 
-      omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
+    omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
 
-      KeyManagerImpl keyManagerImpl =
-          new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
+    KeyManagerImpl keyManagerImpl =
+        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
 
-      keyManagerImpl.refresh(omKeyInfo);
+    keyManagerImpl.refresh(omKeyInfo);
+
+    verify(sclProtocolMock, times(1))
+        .getContainerWithPipelineBatch(containerIDs);
 
-      verify(sclProtocolMock, times(1))
-          .getContainerWithPipelineBatch(containerIDs);
-    } finally {
-      cluster.shutdown();
-    }
   }
 
 
   @Test
   public void testRefreshPipelineException() throws Exception {
-    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).build();
+
+    OzoneManager ozoneManager = om;
+
+    String errorMessage = "Cannot find container!!";
+    StorageContainerLocationProtocol sclProtocolMock = mock(
+        StorageContainerLocationProtocol.class);
+    doThrow(new IOException(errorMessage)).when(sclProtocolMock)
+        .getContainerWithPipelineBatch(anyList());
+
+    ScmClient scmClientMock = mock(ScmClient.class);
+    when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
+
+    OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo("v1",
+        "b1", "k1", ReplicationType.RATIS,
+        ReplicationFactor.THREE);
+
+    // Add block to key.
+    List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
+    Pipeline pipeline = getRandomPipeline();
+
+    OmKeyLocationInfo omKeyLocationInfo =
+        new OmKeyLocationInfo.Builder().setBlockID(
+            new BlockID(100L, 1000L))
+            .setOffset(0).setLength(100L).setPipeline(pipeline).build();
+    omKeyLocationInfoList.add(omKeyLocationInfo);
+    omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
+
+    KeyManagerImpl keyManagerImpl =
+        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
+
     try {
-      cluster.waitForClusterToBeReady();
-      OzoneManager ozoneManager = cluster.getOzoneManager();
-
-      String errorMessage = "Cannot find container!!";
-      StorageContainerLocationProtocol sclProtocolMock = mock(
-          StorageContainerLocationProtocol.class);
-      doThrow(new IOException(errorMessage)).when(sclProtocolMock)
-          .getContainerWithPipelineBatch(anyList());
-
-      ScmClient scmClientMock = mock(ScmClient.class);
-      when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
-
-      OmKeyInfo omKeyInfo = TestOMRequestUtils.createOmKeyInfo("v1",
-          "b1", "k1", ReplicationType.RATIS,
-          ReplicationFactor.THREE);
-
-      // Add block to key.
-      List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
-      Pipeline pipeline = getRandomPipeline();
-
-      OmKeyLocationInfo omKeyLocationInfo =
-          new OmKeyLocationInfo.Builder().setBlockID(
-              new BlockID(100L, 1000L))
-              .setOffset(0).setLength(100L).setPipeline(pipeline).build();
-      omKeyLocationInfoList.add(omKeyLocationInfo);
-      omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
-
-      KeyManagerImpl keyManagerImpl =
-          new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
-
-      try {
-        keyManagerImpl.refresh(omKeyInfo);
-        Assert.fail();
-      } catch (OMException omEx) {
-        Assert.assertEquals(SCM_GET_PIPELINE_EXCEPTION, omEx.getResult());
-        Assert.assertTrue(omEx.getMessage().equals(errorMessage));
-      }
-    } finally {
-      cluster.shutdown();
+      keyManagerImpl.refresh(omKeyInfo);
+      Assert.fail();
+    } catch (OMException omEx) {
+      Assert.assertEquals(SCM_GET_PIPELINE_EXCEPTION, omEx.getResult());
+      Assert.assertTrue(omEx.getMessage().equals(errorMessage));
     }
   }
 
@@ -1385,7 +1397,7 @@ public class TestKeyManagerImpl {
       throws IOException {
     // create super directory
     OmKeyArgs superDirArgs = createKeyArgs(superDir);
-    keyManager.createDirectory(superDirArgs);
+    writeClient.createDirectory(superDirArgs);
     directorySet.add(superDir);
 
     // add directory children to super directory
@@ -1455,7 +1467,7 @@ public class TestKeyManagerImpl {
     for (int i = 0; i < numDirectories; i++) {
       String keyName = parent + "/" + RandomStringUtils.randomAlphabetic(5);
       OmKeyArgs keyArgs = createBuilder().setKeyName(keyName).build();
-      keyManager.createDirectory(keyArgs);
+      writeClient.createDirectory(keyArgs);
       keyNames.add(keyName);
     }
     directoryMap.put(parent, new ArrayList<>(keyNames));
@@ -1468,11 +1480,11 @@ public class TestKeyManagerImpl {
     for (int i = 0; i < numFiles; i++) {
       String keyName = parent + "/" + RandomStringUtils.randomAlphabetic(5);
       OmKeyArgs keyArgs = createBuilder().setKeyName(keyName).build();
-      OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+      OpenKeySession keySession = writeClient.createFile(keyArgs, false, 
false);
       keyArgs.setLocationInfoList(
           keySession.getKeyInfo().getLatestVersionLocations()
               .getLocationList());
-      keyManager.commitKey(keyArgs, keySession.getId());
+      writeClient.commitKey(keyArgs, keySession.getId());
       keyNames.add(keyName);
     }
     fileMap.put(parent, keyNames);
@@ -1480,9 +1492,14 @@ public class TestKeyManagerImpl {
   }
 
   private OmKeyArgs.Builder createBuilder() throws IOException {
+    return createBuilder(BUCKET_NAME);
+  }
+
+  private OmKeyArgs.Builder createBuilder(String bucketName)
+      throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     return new OmKeyArgs.Builder()
-        .setBucketName(BUCKET_NAME)
+        .setBucketName(bucketName)
         .setDataSize(0)
         .setReplicationConfig(
             new StandaloneReplicationConfig(ONE))
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index b569b5d..f073dce 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -17,19 +17,12 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -56,44 +49,6 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
   void stop() throws IOException;
 
   /**
-   * After calling commit, the key will be made visible. There can be multiple
-   * open key writes in parallel (identified by client id). The most recently
-   * committed one will be the one visible.
-   *
-   * @param args the key to commit.
-   * @param clientID the client that is committing.
-   * @throws IOException
-   */
-  void commitKey(OmKeyArgs args, long clientID) throws IOException;
-
-  /**
-   * A client calls this on an open key, to request to allocate a new block,
-   * and appended to the tail of current block list of the open client.
-   *
-   * @param args the key to append
-   * @param clientID the client requesting block.
-   * @param excludeList List of datanodes/containers to exclude during block
-   *                    allocation.
-   * @return the reference to the new block.
-   * @throws IOException
-   */
-  OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
-      ExcludeList excludeList) throws IOException;
-
-  /**
-   * Given the args of a key to put, write an open key entry to meta data.
-   *
-   * In case that the container creation or key write failed on
-   * DistributedStorageHandler, this key's metadata will still stay in OM.
-   * TODO garbage collect the open keys that never get closed
-   *
-   * @param args the args of the key provided by client.
-   * @return a OpenKeySession instance client uses to talk to container.
-   * @throws IOException
-   */
-  OpenKeySession openKey(OmKeyArgs args) throws IOException;
-
-  /**
    * Look up an existing key. Return the info of the key to client side, which
    * DistributedStorageHandler will use to access the data on datanode.
    *
@@ -105,26 +60,6 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
    */
   OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) throws IOException;
 
-  /**
-   * Renames an existing key within a bucket.
-   *
-   * @param args the args of the key provided by client.
-   * @param toKeyName New name to be used for the key
-   * @throws IOException if specified key doesn't exist or
-   * some other I/O errors while renaming the key.
-   */
-  void renameKey(OmKeyArgs args, String toKeyName) throws IOException;
-
-  /**
-   * Deletes an object by an object key. The key will be immediately removed
-   * from OM namespace and become invisible to clients. The object data
-   * will be removed in async manner that might retain for some time.
-   *
-   * @param args the args of the key provided by client.
-   * @throws IOException if specified key doesn't exist or
-   * some other I/O errors while deleting an object.
-   */
-  void deleteKey(OmKeyArgs args) throws IOException;
 
   /**
    * Returns a list of keys represented by {@link OmKeyInfo}
@@ -193,16 +128,6 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
   List<String> getExpiredOpenKeys(int count) throws IOException;
 
   /**
-   * Deletes a expired open key by its name. Called when a hanging key has been
-   * lingering for too long. Once called, the open key entries gets removed
-   * from OM mdata data.
-   *
-   * @param objectKeyName object key name with #open# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deleteExpiredOpenKey(String objectKeyName) throws IOException;
-
-  /**
    * Returns the metadataManager.
    * @return OMMetadataManager.
    */
@@ -215,42 +140,6 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
   BackgroundService getDeletingService();
 
 
-  /**
-   * Initiate multipart upload for the specified key.
-   * @param keyArgs
-   * @return MultipartInfo
-   * @throws IOException
-   */
-  OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws 
IOException;
-
-  /**
-   * Commit Multipart upload part file.
-   * @param omKeyArgs
-   * @param clientID
-   * @return OmMultipartCommitUploadPartInfo
-   * @throws IOException
-   */
-
-  OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
-      OmKeyArgs omKeyArgs, long clientID) throws IOException;
-
-  /**
-   * Complete Multipart upload Request.
-   * @param omKeyArgs
-   * @param multipartUploadList
-   * @return OmMultipartUploadCompleteInfo
-   * @throws IOException
-   */
-  OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs,
-      OmMultipartUploadCompleteList multipartUploadList) throws IOException;
-
-  /**
-   * Abort multipart upload request.
-   * @param omKeyArgs
-   * @throws IOException
-   */
-  void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException;
-
   OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName, String prefix) throws OMException;
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d64607e..b8857fd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -45,24 +44,17 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.UniqueId;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
-import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -71,7 +63,6 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -82,17 +73,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
-import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
@@ -112,14 +97,12 @@ import org.apache.hadoop.util.Time;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
@@ -132,7 +115,6 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BL
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
@@ -312,92 +294,6 @@ public class KeyManagerImpl implements KeyManager {
     }
     return omBucketInfo;
   }
-
-  @Override
-  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
-      ExcludeList excludeList) throws IOException {
-    Preconditions.checkNotNull(args);
-
-
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
-    String openKey = metadataManager.getOpenKey(
-        volumeName, bucketName, keyName, clientID);
-
-    OmKeyInfo keyInfo =
-        metadataManager.getOpenKeyTable(getBucketLayout()).get(openKey);
-    if (keyInfo == null) {
-      LOG.error("Allocate block for a key not in open status in meta store" +
-          " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
-      throw new OMException("Open Key not found",
-          KEY_NOT_FOUND);
-    }
-
-    // current version not committed, so new blocks coming now are added to
-    // the same version
-    List<OmKeyLocationInfo> locationInfos =
-        allocateBlock(keyInfo, excludeList, scmBlockSize);
-
-    keyInfo.appendNewBlocks(locationInfos, true);
-    keyInfo.updateModifcationTime();
-    metadataManager.getOpenKeyTable(getBucketLayout()).put(openKey, keyInfo);
-
-    return locationInfos.get(0);
-
-  }
-
-  /**
-   * This methods avoids multiple rpc calls to SCM by allocating multiple 
blocks
-   * in one rpc call.
-   * @param keyInfo - key info for key to be allocated.
-   * @param requestedSize requested length for allocation.
-   * @param excludeList exclude list while allocating blocks.
-   * @param requestedSize requested size to be allocated.
-   * @return
-   * @throws IOException
-   */
-  private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
-      ExcludeList excludeList, long requestedSize) throws IOException {
-    int numBlocks = Math.min((int) ((requestedSize - 1) / scmBlockSize + 1),
-        preallocateBlocksMax);
-    List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
-    String remoteUser = getRemoteUser().getShortUserName();
-    List<AllocatedBlock> allocatedBlocks;
-    try {
-      allocatedBlocks = scmClient.getBlockClient()
-          .allocateBlock(
-              scmBlockSize,
-              numBlocks,
-              keyInfo.getReplicationConfig(),
-              omId,
-              excludeList);
-
-    } catch (SCMException ex) {
-      if (ex.getResult()
-          .equals(SCMException.ResultCodes.SAFE_MODE_EXCEPTION)) {
-        throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_SAFE_MODE);
-      }
-      throw ex;
-    }
-    for (AllocatedBlock allocatedBlock : allocatedBlocks) {
-      BlockID blockID = new BlockID(allocatedBlock.getBlockID());
-      OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
-          .setBlockID(blockID)
-          .setLength(scmBlockSize)
-          .setOffset(0)
-          .setPipeline(allocatedBlock.getPipeline());
-      if (grpcBlockTokenEnabled) {
-        builder.setToken(secretManager
-            .generateToken(remoteUser, blockID,
-                EnumSet.of(READ, WRITE), scmBlockSize));
-      }
-      locationInfos.add(builder.build());
-    }
-    return locationInfos;
-  }
-
   /* Optimize ugi lookup for RPC operations to avoid a trip through
    * UGI.getCurrentUser which is synch'ed.
    */
@@ -428,220 +324,6 @@ public class KeyManagerImpl implements KeyManager {
     Preconditions.checkNotNull(edek);
     return edek;
   }
-
-  @Override
-  public OpenKeySession openKey(OmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    Preconditions.checkNotNull(args.getAcls(), "Default acls " +
-        "should be set.");
-
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
-
-    long currentTime = UniqueId.next();
-    OmKeyInfo keyInfo;
-    long openVersion;
-    // NOTE size of a key is not a hard limit on anything, it is a value that
-    // client should expect, in terms of current size of key. If client sets
-    // a value, then this value is used, otherwise, we allocate a single
-    // block which is the current size, if read by the client.
-    final long size = args.getDataSize() > 0 ?
-        args.getDataSize() : scmBlockSize;
-    final List<OmKeyLocationInfo> locations = new ArrayList<>();
-
-    String dbKeyName = metadataManager.getOzoneKey(
-        args.getVolumeName(), args.getBucketName(), args.getKeyName());
-
-    FileEncryptionInfo encInfo;
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    OmBucketInfo bucketInfo;
-    try {
-      bucketInfo = getBucketInfo(volumeName, bucketName);
-      encInfo = getFileEncryptionInfo(bucketInfo);
-      keyInfo = prepareKeyInfo(args, dbKeyName, size, locations, encInfo);
-    } catch (OMException e) {
-      throw e;
-    } catch (IOException ex) {
-      LOG.error("Key open failed for volume:{} bucket:{} key:{}",
-          volumeName, bucketName, keyName, ex);
-      throw new OMException(ex.getMessage(), ResultCodes.KEY_ALLOCATION_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-    if (keyInfo == null) {
-      // the key does not exist, create a new object, the new blocks are the
-      // version 0
-      keyInfo = createKeyInfo(args, locations, args.getReplicationConfig(),
-              size, encInfo, bucketInfo);
-    }
-    openVersion = keyInfo.getLatestVersionLocations().getVersion();
-    LOG.debug("Key {} allocated in volume {} bucket {}",
-        keyName, volumeName, bucketName);
-    allocateBlockInKey(keyInfo, size, currentTime);
-    return new OpenKeySession(currentTime, keyInfo, openVersion);
-  }
-
-  private void allocateBlockInKey(OmKeyInfo keyInfo, long size, long sessionId)
-      throws IOException {
-    String openKey = metadataManager
-        .getOpenKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
-            keyInfo.getKeyName(), sessionId);
-    // requested size is not required but more like a optimization:
-    // SCM looks at the requested, if it 0, no block will be allocated at
-    // the point, if client needs more blocks, client can always call
-    // allocateBlock. But if requested size is not 0, OM will preallocate
-    // some blocks and piggyback to client, to save RPC calls.
-    if (size > 0) {
-      List<OmKeyLocationInfo> locationInfos =
-          allocateBlock(keyInfo, new ExcludeList(), size);
-      keyInfo.appendNewBlocks(locationInfos, true);
-    }
-
-    metadataManager.getOpenKeyTable(getBucketLayout()).put(openKey, keyInfo);
-
-  }
-
-  private OmKeyInfo prepareKeyInfo(
-      OmKeyArgs keyArgs, String dbKeyName, long size,
-      List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
-      throws IOException {
-    OmKeyInfo keyInfo = null;
-    if (keyArgs.getIsMultipartKey()) {
-      keyInfo = prepareMultipartKeyInfo(keyArgs, size, locations, encInfo);
-    } else if (metadataManager.getKeyTable(
-        getBucketLayout(metadataManager, keyArgs.getVolumeName(),
-            keyArgs.getBucketName())).isExist(dbKeyName)) {
-      keyInfo = metadataManager.getKeyTable(
-          getBucketLayout(metadataManager, keyArgs.getVolumeName(),
-              keyArgs.getBucketName())).get(dbKeyName);
-      // the key already exist, the new blocks will be added as new version
-      // when locations.size = 0, the new version will have identical blocks
-      // as its previous version
-      keyInfo.addNewVersion(locations, true, true);
-      keyInfo.setDataSize(size + keyInfo.getDataSize());
-    }
-    if(keyInfo != null) {
-      keyInfo.setMetadata(keyArgs.getMetadata());
-    }
-    return keyInfo;
-  }
-
-  private OmKeyInfo prepareMultipartKeyInfo(OmKeyArgs args, long size,
-      List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
-      throws IOException {
-
-    Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0,
-        "PartNumber Should be greater than zero");
-    // When key is multipart upload part key, we should take replication
-    // type and replication factor from original key which has done
-    // initiate multipart upload. If we have not found any such, we throw
-    // error no such multipart upload.
-    String uploadID = args.getMultipartUploadID();
-    Preconditions.checkNotNull(uploadID);
-    String multipartKey = metadataManager
-        .getMultipartKey(args.getVolumeName(), args.getBucketName(),
-            args.getKeyName(), uploadID);
-    OmKeyInfo partKeyInfo =
-        metadataManager.getOpenKeyTable(getBucketLayout()).get(multipartKey);
-    if (partKeyInfo == null) {
-      throw new OMException("No such Multipart upload is with specified " +
-          "uploadId " + uploadID,
-          ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-    }
-
-    // For this upload part we don't need to check in KeyTable. As this
-    // is not an actual key, it is a part of the key.
-    return createKeyInfo(args, locations,
-            partKeyInfo.getReplicationConfig(), size, encInfo,
-        getBucketInfo(args.getVolumeName(), args.getBucketName()));
-  }
-
-  /**
-   * Create OmKeyInfo object.
-   * @param keyArgs
-   * @param locations
-   * @param replicationConfig
-   * @param size
-   * @param encInfo
-   * @param omBucketInfo
-   * @return
-   */
-  private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs,
-      List<OmKeyLocationInfo> locations,
-      ReplicationConfig replicationConfig, long size,
-      FileEncryptionInfo encInfo,
-      OmBucketInfo omBucketInfo) {
-    OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
-        .setVolumeName(keyArgs.getVolumeName())
-        .setBucketName(keyArgs.getBucketName())
-        .setKeyName(keyArgs.getKeyName())
-        .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, locations)))
-        .setCreationTime(Time.now())
-        .setModificationTime(Time.now())
-        .setDataSize(size)
-        .setReplicationConfig(replicationConfig)
-        .setFileEncryptionInfo(encInfo)
-        .addAllMetadata(keyArgs.getMetadata());
-    builder.setAcls(getAclsForKey(keyArgs, omBucketInfo));
-
-    if(Boolean.valueOf(omBucketInfo.getMetadata().get(OzoneConsts.GDPR_FLAG))) 
{
-      builder.addMetadata(OzoneConsts.GDPR_FLAG, Boolean.TRUE.toString());
-    }
-    return builder.build();
-  }
-
-  @Override
-  public void commitKey(OmKeyArgs args, long clientID) throws IOException {
-    Preconditions.checkNotNull(args);
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
-    String objectKey = metadataManager
-        .getOzoneKey(volumeName, bucketName, keyName);
-    String openKey = metadataManager
-        .getOpenKey(volumeName, bucketName, keyName, clientID);
-    Preconditions.checkNotNull(locationInfoList);
-    try {
-      metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-      OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
-      OmKeyInfo keyInfo =
-          metadataManager.getOpenKeyTable(getBucketLayout()).get(openKey);
-      if (keyInfo == null) {
-        throw new OMException("Failed to commit key, as " + openKey + "entry " 
+
-            "is not found in the openKey table", KEY_NOT_FOUND);
-      }
-      keyInfo.setDataSize(args.getDataSize());
-      keyInfo.setModificationTime(Time.now());
-
-      //update the block length for each block
-      keyInfo.updateLocationInfoList(locationInfoList, false);
-      metadataManager.getStore().move(
-          openKey,
-          objectKey,
-          keyInfo,
-          metadataManager.getOpenKeyTable(getBucketLayout()), metadataManager
-              .getKeyTable(
-                  getBucketLayout(metadataManager, volumeName, bucketName)));
-    } catch (OMException e) {
-      throw e;
-    } catch (IOException ex) {
-      LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
-          volumeName, bucketName, keyName, ex);
-      throw new OMException(ex.getMessage(),
-          ResultCodes.KEY_ALLOCATION_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
       throws IOException {
@@ -835,135 +517,6 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
-  @Override
-  public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
-    Preconditions.checkNotNull(args);
-    Preconditions.checkNotNull(toKeyName);
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String fromKeyName = args.getKeyName();
-    if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
-      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} 
toKey:{}",
-          volumeName, bucketName, fromKeyName, toKeyName);
-      throw new OMException("Key name is empty",
-          ResultCodes.INVALID_KEY_NAME);
-    }
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    try {
-      // fromKeyName should exist
-      String fromKey = metadataManager.getOzoneKey(
-          volumeName, bucketName, fromKeyName);
-      OmKeyInfo fromKeyValue = metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volumeName, 
bucketName))
-          .get(fromKey);
-      if (fromKeyValue == null) {
-        // TODO: Add support for renaming open key
-        LOG.error(
-            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
-                + "Key: {} not found.", volumeName, bucketName, fromKeyName,
-            toKeyName, fromKeyName);
-        throw new OMException("Key not found",
-            KEY_NOT_FOUND);
-      }
-
-      // A rename is a no-op if the target and source name is same.
-      // TODO: Discuss if we need to throw?.
-      if (fromKeyName.equals(toKeyName)) {
-        return;
-      }
-
-      // toKeyName should not exist
-      String toKey =
-          metadataManager.getOzoneKey(volumeName, bucketName, toKeyName);
-      OmKeyInfo toKeyValue = metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volumeName, 
bucketName))
-          .get(toKey);
-      if (toKeyValue != null) {
-        LOG.error(
-            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
-                + "Key: {} already exists.", volumeName, bucketName,
-            fromKeyName, toKeyName, toKeyName);
-        throw new OMException("Key already exists",
-            OMException.ResultCodes.KEY_ALREADY_EXISTS);
-      }
-
-      fromKeyValue.setKeyName(toKeyName);
-      fromKeyValue.updateModifcationTime();
-      DBStore store = metadataManager.getStore();
-      try (BatchOperation batch = store.initBatchOperation()) {
-        metadataManager.getKeyTable(
-            getBucketLayout(metadataManager, volumeName, bucketName))
-            .deleteWithBatch(batch, fromKey);
-        metadataManager.getKeyTable(
-            getBucketLayout(metadataManager, volumeName, bucketName))
-            .putWithBatch(batch, toKey, fromKeyValue);
-        store.commitBatchOperation(batch);
-      }
-    } catch (IOException ex) {
-      if (ex instanceof OMException) {
-        throw ex;
-      }
-      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} 
toKey:{}",
-          volumeName, bucketName, fromKeyName, toKeyName, ex);
-      throw new OMException(ex.getMessage(),
-          ResultCodes.KEY_RENAME_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
-  @Override
-  public void deleteKey(OmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    try {
-      String objectKey = metadataManager.getOzoneKey(
-          volumeName, bucketName, keyName);
-      OmKeyInfo keyInfo = metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volumeName, 
bucketName))
-          .get(objectKey);
-      if (keyInfo == null) {
-        throw new OMException("Key not found",
-            KEY_NOT_FOUND);
-      } else {
-        // directly delete key with no blocks from db. This key need not be
-        // moved to deleted table.
-        if (isKeyEmpty(keyInfo)) {
-          metadataManager.getKeyTable(
-              getBucketLayout(metadataManager, volumeName, bucketName))
-              .delete(objectKey);
-          LOG.debug("Key {} deleted from OM DB", keyName);
-          return;
-        }
-      }
-      RepeatedOmKeyInfo repeatedOmKeyInfo =
-          metadataManager.getDeletedTable().get(objectKey);
-      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(keyInfo,
-          repeatedOmKeyInfo, 0L, false);
-      metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volumeName, 
bucketName))
-          .delete(objectKey);
-      metadataManager.getDeletedTable().put(objectKey, repeatedOmKeyInfo);
-    } catch (OMException ex) {
-      throw ex;
-    } catch (IOException ex) {
-      LOG.error(String.format("Delete key failed for volume:%s "
-          + "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
-      throw new OMException(ex.getMessage(), ex,
-          ResultCodes.KEY_DELETION_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
   private boolean isKeyEmpty(OmKeyInfo keyInfo) {
     for (OmKeyLocationInfoGroup keyLocationList : keyInfo
         .getKeyLocationVersions()) {
@@ -1029,12 +582,6 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
-    Preconditions.checkNotNull(objectKeyName);
-    // TODO: Fix this in later patches.
-  }
-
-  @Override
   public OMMetadataManager getMetadataManager() {
     return metadataManager;
   }
@@ -1050,355 +597,6 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
-      IOException {
-    Preconditions.checkNotNull(omKeyArgs);
-    String uploadID = UUID.randomUUID().toString() + "-" + UniqueId.next();
-    return createMultipartInfo(omKeyArgs, uploadID);
-  }
-
-  private OmMultipartInfo createMultipartInfo(OmKeyArgs keyArgs,
-      String multipartUploadID) throws IOException {
-    String volumeName = keyArgs.getVolumeName();
-    String bucketName = keyArgs.getBucketName();
-    String keyName = keyArgs.getKeyName();
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    OmBucketInfo bucketInfo = validateS3Bucket(volumeName, bucketName);
-    try {
-
-      // We are adding uploadId to key, because if multiple users try to
-      // perform multipart upload on the same key, each will try to upload, who
-      // ever finally commit the key, we see that key in ozone. Suppose if we
-      // don't add id, and use the same key /volume/bucket/key, when multiple
-      // users try to upload the key, we update the parts of the key's from
-      // multiple users to same key, and the key output can be a mix of the
-      // parts from multiple users.
-
-      // So on same key if multiple time multipart upload is initiated we
-      // store multiple entries in the openKey Table.
-      // Checked AWS S3, when we try to run multipart upload, each time a
-      // new uploadId is returned.
-
-      String multipartKey = metadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, multipartUploadID);
-
-      // Not checking if there is an already key for this in the keyTable, as
-      // during final complete multipart upload we take care of this.
-
-      long currentTime = Time.now();
-      Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>();
-      OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo.Builder()
-          .setUploadID(multipartUploadID)
-          .setCreationTime(currentTime)
-          .setReplicationConfig(keyArgs.getReplicationConfig())
-          .setPartKeyInfoList(partKeyInfoMap)
-          .build();
-      Map<Long, List<OmKeyLocationInfo>> locations = new HashMap<>();
-      OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
-          .setVolumeName(keyArgs.getVolumeName())
-          .setBucketName(keyArgs.getBucketName())
-          .setKeyName(keyArgs.getKeyName())
-          .setCreationTime(currentTime)
-          .setModificationTime(currentTime)
-          .setReplicationConfig(keyArgs.getReplicationConfig())
-          .setOmKeyLocationInfos(Collections.singletonList(
-              new OmKeyLocationInfoGroup(0, locations)))
-          .setAcls(getAclsForKey(keyArgs, bucketInfo))
-          .build();
-      DBStore store = metadataManager.getStore();
-      try (BatchOperation batch = store.initBatchOperation()) {
-        // Create an entry in open key table and multipart info table for
-        // this key.
-        metadataManager.getMultipartInfoTable().putWithBatch(batch,
-            multipartKey, multipartKeyInfo);
-        metadataManager.getOpenKeyTable(getBucketLayout()).putWithBatch(batch,
-            multipartKey, omKeyInfo);
-        store.commitBatchOperation(batch);
-        return new OmMultipartInfo(volumeName, bucketName, keyName,
-            multipartUploadID);
-      }
-    } catch (IOException ex) {
-      LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " +
-          "key:{}", volumeName, bucketName, keyName, ex);
-      throw new OMException(ex.getMessage(),
-          ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
-  private List<OzoneAcl> getAclsForKey(OmKeyArgs keyArgs,
-      OmBucketInfo bucketInfo) {
-    List<OzoneAcl> acls = new ArrayList<>();
-
-    if(keyArgs.getAcls() != null) {
-      acls.addAll(keyArgs.getAcls());
-    }
-
-    // Inherit DEFAULT acls from prefix.
-    if(prefixManager != null) {
-      List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath(
-          OZONE_URI_DELIMITER +
-              keyArgs.getVolumeName() + OZONE_URI_DELIMITER +
-              keyArgs.getBucketName() + OZONE_URI_DELIMITER +
-              keyArgs.getKeyName());
-
-      if (!prefixList.isEmpty()) {
-        // Add all acls from direct parent to key.
-        OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
-        if(prefixInfo  != null) {
-          if (OzoneAclUtil.inheritDefaultAcls(acls, prefixInfo.getAcls())) {
-            return acls;
-          }
-        }
-      }
-    }
-
-    // Inherit DEFAULT acls from bucket only if DEFAULT acls for
-    // prefix are not set.
-    if (bucketInfo != null) {
-      if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) {
-        return acls;
-      }
-    }
-
-    // TODO: do we need to further fallback to volume default ACL
-    return acls;
-  }
-
-  @Override
-  public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
-      OmKeyArgs omKeyArgs, long clientID) throws IOException {
-    Preconditions.checkNotNull(omKeyArgs);
-    String volumeName = omKeyArgs.getVolumeName();
-    String bucketName = omKeyArgs.getBucketName();
-    String keyName = omKeyArgs.getKeyName();
-    String uploadID = omKeyArgs.getMultipartUploadID();
-    int partNumber = omKeyArgs.getMultipartUploadPartNumber();
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    validateS3Bucket(volumeName, bucketName);
-    String partName;
-    try {
-      String multipartKey = metadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, uploadID);
-      OmMultipartKeyInfo multipartKeyInfo = metadataManager
-          .getMultipartInfoTable().get(multipartKey);
-
-      String openKey = metadataManager.getOpenKey(
-          volumeName, bucketName, keyName, clientID);
-      OmKeyInfo keyInfo =
-          metadataManager.getOpenKeyTable(getBucketLayout()).get(openKey);
-
-      // set the data size and location info list
-      keyInfo.setDataSize(omKeyArgs.getDataSize());
-      keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList(), true);
-
-      partName = metadataManager.getOzoneKey(volumeName, bucketName, keyName)
-          + clientID;
-      if (multipartKeyInfo == null) {
-        // This can occur when user started uploading part by the time commit
-        // of that part happens, in between the user might have requested
-        // abort multipart upload. If we just throw exception, then the data
-        // will not be garbage collected, so move this part to delete table
-        // and throw error
-        // Move this part to delete table.
-        RepeatedOmKeyInfo repeatedOmKeyInfo =
-            metadataManager.getDeletedTable().get(partName);
-        repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-            keyInfo, repeatedOmKeyInfo, 0L, false);
-        metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo);
-        throw new OMException("No such Multipart upload is with specified " +
-            "uploadId " + uploadID, 
ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-      } else {
-        PartKeyInfo oldPartKeyInfo =
-            multipartKeyInfo.getPartKeyInfo(partNumber);
-        PartKeyInfo.Builder partKeyInfo = PartKeyInfo.newBuilder();
-        partKeyInfo.setPartName(partName);
-        partKeyInfo.setPartNumber(partNumber);
-        // TODO remove unused write code path
-        partKeyInfo.setPartKeyInfo(keyInfo.getProtobuf(CURRENT_VERSION));
-        multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
-        if (oldPartKeyInfo == null) {
-          // This is the first time part is being added.
-          DBStore store = metadataManager.getStore();
-          try (BatchOperation batch = store.initBatchOperation()) {
-            metadataManager.getOpenKeyTable(getBucketLayout())
-                .deleteWithBatch(batch, openKey);
-            metadataManager.getMultipartInfoTable().putWithBatch(batch,
-                multipartKey, multipartKeyInfo);
-            store.commitBatchOperation(batch);
-          }
-        } else {
-          // If we have this part already, that means we are overriding it.
-          // We need to 3 steps.
-          // Add the old entry to delete table.
-          // Remove the new entry from openKey table.
-          // Add the new entry in to the list of part keys.
-          DBStore store = metadataManager.getStore();
-          try (BatchOperation batch = store.initBatchOperation()) {
-            OmKeyInfo partKey = OmKeyInfo.getFromProtobuf(
-                oldPartKeyInfo.getPartKeyInfo());
-
-            RepeatedOmKeyInfo repeatedOmKeyInfo =
-                metadataManager.getDeletedTable()
-                    .get(oldPartKeyInfo.getPartName());
-
-            repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-                partKey, repeatedOmKeyInfo, 0L, false);
-
-            metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo);
-            metadataManager.getDeletedTable().putWithBatch(batch,
-                oldPartKeyInfo.getPartName(),
-                repeatedOmKeyInfo);
-            metadataManager.getOpenKeyTable(getBucketLayout())
-                .deleteWithBatch(batch, openKey);
-            metadataManager.getMultipartInfoTable().putWithBatch(batch,
-                multipartKey, multipartKeyInfo);
-            store.commitBatchOperation(batch);
-          }
-        }
-      }
-    } catch (IOException ex) {
-      LOG.error("Upload part Failed: volume:{} bucket:{} " +
-          "key:{} PartNumber: {}", volumeName, bucketName, keyName,
-          partNumber, ex);
-      throw new OMException(ex.getMessage(),
-          ResultCodes.MULTIPART_UPLOAD_PARTFILE_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-
-    return new OmMultipartCommitUploadPartInfo(partName);
-
-  }
-
-  @Override
-  @SuppressWarnings("methodlength")
-  public OmMultipartUploadCompleteInfo completeMultipartUpload(
-      OmKeyArgs omKeyArgs, OmMultipartUploadCompleteList multipartUploadList)
-      throws IOException {
-    Preconditions.checkNotNull(omKeyArgs);
-    Preconditions.checkNotNull(multipartUploadList);
-    String volumeName = omKeyArgs.getVolumeName();
-    String bucketName = omKeyArgs.getBucketName();
-    String keyName = omKeyArgs.getKeyName();
-    String uploadID = omKeyArgs.getMultipartUploadID();
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    validateS3Bucket(volumeName, bucketName);
-    try {
-      String multipartKey = metadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, uploadID);
-
-      OmMultipartKeyInfo multipartKeyInfo = metadataManager
-          .getMultipartInfoTable().get(multipartKey);
-      if (multipartKeyInfo == null) {
-        throw new OMException("Complete Multipart Upload Failed: volume: " +
-            volumeName + "bucket: " + bucketName + "key: " + keyName,
-            ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-      }
-      //TODO: Actual logic has been removed from this, and the old code has a
-      // bug. New code for this is in S3MultipartUploadCompleteRequest.
-      // This code will be cleaned up as part of HDDS-2353.
-
-      return new OmMultipartUploadCompleteInfo(omKeyArgs.getVolumeName(),
-          omKeyArgs.getBucketName(), omKeyArgs.getKeyName(), DigestUtils
-              .sha256Hex(keyName));
-    } catch (OMException ex) {
-      throw ex;
-    } catch (IOException ex) {
-      LOG.error("Complete Multipart Upload Failed: volume: " + volumeName +
-          "bucket: " + bucketName + "key: " + keyName, ex);
-      throw new OMException(ex.getMessage(), ResultCodes
-          .COMPLETE_MULTIPART_UPLOAD_ERROR);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
-  @Override
-  public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
-
-    Preconditions.checkNotNull(omKeyArgs);
-    String volumeName = omKeyArgs.getVolumeName();
-    String bucketName = omKeyArgs.getBucketName();
-    String keyName = omKeyArgs.getKeyName();
-    String uploadID = omKeyArgs.getMultipartUploadID();
-    Preconditions.checkNotNull(uploadID, "uploadID cannot be null");
-    validateS3Bucket(volumeName, bucketName);
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    OmBucketInfo bucketInfo;
-    try {
-      String multipartKey = metadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, uploadID);
-      OmMultipartKeyInfo multipartKeyInfo = metadataManager
-          .getMultipartInfoTable().get(multipartKey);
-      OmKeyInfo openKeyInfo =
-          metadataManager.getOpenKeyTable(getBucketLayout()).get(multipartKey);
-
-      // If there is no entry in openKeyTable, then there is no multipart
-      // upload initiated for this key.
-      if (openKeyInfo == null) {
-        LOG.error("Abort Multipart Upload Failed: volume: {} bucket: {} "
-                + "key: {} with error no such uploadID: {}", volumeName,
-                bucketName, keyName, uploadID);
-        throw new OMException("Abort Multipart Upload Failed: volume: " +
-            volumeName + "bucket: " + bucketName + "key: " + keyName,
-            ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-      } else {
-        // Move all the parts to delete table
-        TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
-            .getPartKeyInfoMap();
-        DBStore store = metadataManager.getStore();
-        try (BatchOperation batch = store.initBatchOperation()) {
-          for (Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : 
partKeyInfoMap
-              .entrySet()) {
-            PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
-            OmKeyInfo currentKeyPartInfo = OmKeyInfo.getFromProtobuf(
-                partKeyInfo.getPartKeyInfo());
-
-            RepeatedOmKeyInfo repeatedOmKeyInfo =
-                metadataManager.getDeletedTable()
-                    .get(partKeyInfo.getPartName());
-
-            repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-                currentKeyPartInfo, repeatedOmKeyInfo, 0L, false);
-
-            metadataManager.getDeletedTable().putWithBatch(batch,
-                partKeyInfo.getPartName(), repeatedOmKeyInfo);
-          }
-          // Finally delete the entry from the multipart info table and open
-          // key table
-          metadataManager.getMultipartInfoTable().deleteWithBatch(batch,
-              multipartKey);
-          metadataManager.getOpenKeyTable(getBucketLayout())
-              .deleteWithBatch(batch, multipartKey);
-          store.commitBatchOperation(batch);
-        }
-      }
-    } catch (OMException ex) {
-      throw ex;
-    } catch (IOException ex) {
-      LOG.error("Abort Multipart Upload Failed: volume: " + volumeName +
-          "bucket: " + bucketName + "key: " + keyName, ex);
-      throw new OMException(ex.getMessage(), ResultCodes
-          .ABORT_MULTIPART_UPLOAD_FAILED);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-
-  }
-
-  @Override
   public OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName, String prefix) throws OMException {
     Preconditions.checkNotNull(volumeName);
@@ -2132,53 +1330,6 @@ public class KeyManagerImpl implements KeyManager {
             FILE_NOT_FOUND);
   }
 
-  /**
-   * Ozone FS api to create a directory. Parent directories if do not exist
-   * are created for the input directory.
-   *
-   * @param args Key args
-   * @throws OMException if any entry in the path exists as a file
-   *                     if bucket does not exist
-   * @throws IOException if there is error in the db
-   *                     invalid arguments
-   */
-  @Override
-  public void createDirectory(OmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args, "Key args can not be null");
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    try {
-
-      // Check if this is the root of the filesystem.
-      if (keyName.length() == 0) {
-        return;
-      }
-
-      Path keyPath = Paths.get(keyName);
-      OzoneFileStatus status =
-          verifyNoFilesInPath(volumeName, bucketName, keyPath, false);
-      if (status != null && status.getTrimmedName()
-          .equals(keyName)) {
-        // if directory already exists
-        return;
-      }
-      OmKeyInfo dirDbKeyInfo =
-          createDirectoryKey(volumeName, bucketName, keyName, args.getAcls());
-      String dirDbKey = metadataManager
-          .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName());
-      metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volumeName, 
bucketName))
-          .put(dirDbKey, dirDbKeyInfo);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-  }
-
   private OmKeyInfo createDirectoryKey(String volumeName, String bucketName,
       String keyName, List<OzoneAcl> acls) throws IOException {
     // verify bucket exists
@@ -2200,65 +1351,6 @@ public class KeyManagerImpl implements KeyManager {
         .setAcls(acls)
         .build();
   }
-
-  /**
-   * OzoneFS api to creates an output stream for a file.
-   *
-   * @param args        Key args
-   * @param isOverWrite if true existing file at the location will be
-   *                    overwritten
-   * @param isRecursive if true file would be created even if parent
-   *                    directories do not exist
-   * @throws OMException if given key is a directory
-   *                     if file exists and isOverwrite flag is false
-   *                     if an ancestor exists as a file
-   *                     if bucket does not exist
-   * @throws IOException if there is error in the db
-   *                     invalid arguments
-   */
-  @Override
-  public OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
-      boolean isRecursive) throws IOException {
-    Preconditions.checkNotNull(args, "Key args can not be null");
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    OpenKeySession keySession;
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    try {
-      OzoneFileStatus fileStatus;
-      try {
-        fileStatus = getFileStatus(args);
-        if (fileStatus.isDirectory()) {
-          throw new OMException("Can not write to directory: " + keyName,
-              ResultCodes.NOT_A_FILE);
-        } else if (fileStatus.isFile()) {
-          if (!isOverWrite) {
-            throw new OMException("File " + keyName + " already exists",
-                ResultCodes.FILE_ALREADY_EXISTS);
-          }
-        }
-      } catch (OMException ex) {
-        if (ex.getResult() != FILE_NOT_FOUND) {
-          throw ex;
-        }
-      }
-
-      verifyNoFilesInPath(volumeName, bucketName,
-          Paths.get(keyName).getParent(), !isRecursive);
-      // TODO: Optimize call to openKey as keyInfo is already available in the
-      // filestatus. We can avoid some operations in openKey call.
-      keySession = openKey(args);
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-          bucketName);
-    }
-
-    return keySession;
-  }
-
   /**
    * OzoneFS api to lookup for a file.
    *
@@ -3279,10 +2371,6 @@ public class KeyManagerImpl implements KeyManager {
     return false;
   }
 
-  private BucketLayout getBucketLayout() {
-    return BucketLayout.DEFAULT;
-  }
-
   private BucketLayout getBucketLayout(OMMetadataManager omMetadataManager,
       String volName, String buckName) {
     if (omMetadataManager == null) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
index a7329c8..4db738a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.fs;
 import org.apache.hadoop.ozone.om.IOzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 
 import java.io.IOException;
@@ -53,11 +52,6 @@ public interface OzoneManagerFS extends IOzoneAcl {
   OzoneFileStatus getFileStatus(OmKeyArgs args, String clientAddress)
           throws IOException;
 
-  void createDirectory(OmKeyArgs args) throws IOException;
-
-  OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
-      boolean isRecursive) throws IOException;
-
   /**
    * Look up a file. Return the info of the file to client side.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
new file mode 100644
index 0000000..a8cbdc6
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+/**
+ * Test utility for creating a dummy OM, the associated
+ * managers, and writeClient.
+ */
+public final class OmTestManagers {
+
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private KeyManager keyManager;
+  private OMMetadataManager metadataManager;
+  private VolumeManager volumeManager;
+  private BucketManager bucketManager;
+  private PrefixManager prefixManager;
+
+  public OzoneManager getOzoneManager() {
+    return om;
+  }
+  public OzoneManagerProtocol getWriteClient() {
+    return writeClient;
+  }
+  public BucketManager getBucketManager() {
+    return bucketManager;
+  }
+  public VolumeManager getVolumeManager() {
+    return volumeManager;
+  }
+  public PrefixManager getPrefixManager() {
+    return prefixManager;
+  }
+  public OMMetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  public OmTestManagers(OzoneConfiguration conf)
+      throws AuthenticationException, IOException {
+    this(conf, null, null);
+  }
+
+  public OmTestManagers(OzoneConfiguration conf,
+                        ScmBlockLocationProtocol blockClient,
+                        StorageContainerLocationProtocol containerClient)
+      throws AuthenticationException, IOException {
+    if (containerClient == null) {
+      containerClient =
+          Mockito.mock(StorageContainerLocationProtocol.class);
+    }
+    if (blockClient == null) {
+      blockClient =
+          new ScmBlockLocationTestingClient(null, null, 0);
+    }
+
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    OMStorage omStorage = new OMStorage(conf);
+    omStorage.setClusterId("omtest");
+    omStorage.setOmId("omtest");
+    omStorage.initialize();
+    OzoneManager.setTestSecureOmFlag(true);
+    om = OzoneManager.createOm(conf,
+        OzoneManager.StartupOption.REGUALR);
+
+    keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
+        .getInternalState(om, "keyManager");
+    ScmClient scmClient = new ScmClient(blockClient, containerClient);
+    HddsWhiteboxTestUtils.setInternalState(om,
+        "scmClient", scmClient);
+    HddsWhiteboxTestUtils.setInternalState(keyManager,
+        "scmClient", scmClient);
+    HddsWhiteboxTestUtils.setInternalState(keyManager,
+        "secretManager", Mockito.mock(OzoneBlockTokenSecretManager.class));
+
+    om.start();
+    writeClient = OzoneClientFactory.getRpcClient(conf)
+        .getObjectStore().getClientProxy().getOzoneManagerClient();
+    metadataManager = (OmMetadataManagerImpl) HddsWhiteboxTestUtils
+        .getInternalState(om, "metadataManager");
+    volumeManager = (VolumeManagerImpl)HddsWhiteboxTestUtils
+        .getInternalState(om, "volumeManager");
+    bucketManager = (BucketManagerImpl)HddsWhiteboxTestUtils
+        .getInternalState(om, "bucketManager");
+    prefixManager = (PrefixManagerImpl)HddsWhiteboxTestUtils
+        .getInternalState(om, "prefixManager");
+
+  }
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
index 7d5fb60..0a0a993 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -31,12 +30,15 @@ import 
org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,6 +64,8 @@ import org.junit.rules.TemporaryFolder;
 public class TestKeyDeletingService {
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
 
   private OzoneConfiguration createConfAndInitValues() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
@@ -79,6 +84,11 @@ public class TestKeyDeletingService {
     return conf;
   }
 
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
   /**
    * In this test, we create a bunch of keys and delete them. Then we start the
    * KeyDeletingService and pass a SCMClient which does not fail. We make sure
@@ -90,14 +100,15 @@ public class TestKeyDeletingService {
 
   @Test(timeout = 30000)
   public void checkIfDeleteServiceisDeletingKeys()
-      throws IOException, TimeoutException, InterruptedException {
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
     OzoneConfiguration conf = createConfAndInitValues();
-    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
-    KeyManager keyManager =
-        new KeyManagerImpl(
-            new ScmBlockLocationTestingClient(null, null, 0),
-            metaMgr, conf, UUID.randomUUID().toString(), null);
-    keyManager.start(conf);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+
     final int keyCount = 100;
     createAndDeleteKeys(keyManager, keyCount, 1);
     KeyDeletingService keyDeletingService =
@@ -112,20 +123,22 @@ public class TestKeyDeletingService {
 
   @Test(timeout = 30000)
   public void checkIfDeleteServiceWithFailingSCM()
-      throws IOException, TimeoutException, InterruptedException {
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
     OzoneConfiguration conf = createConfAndInitValues();
-    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
-    //failCallsFrequency = 1 , means all calls fail.
-    KeyManager keyManager =
-        new KeyManagerImpl(
-            new ScmBlockLocationTestingClient(null, null, 1),
-            metaMgr, conf, UUID.randomUUID().toString(), null);
-    keyManager.start(conf);
+    ScmBlockLocationProtocol blockClient =
+        //failCallsFrequency = 1 , means all calls fail.
+        new ScmBlockLocationTestingClient(null, null, 1);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf, blockClient, null);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+
     final int keyCount = 100;
     createAndDeleteKeys(keyManager, keyCount, 1);
     KeyDeletingService keyDeletingService =
         (KeyDeletingService) keyManager.getDeletingService();
-    keyManager.start(conf);
     Assert.assertEquals(
         keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), keyCount);
     // Make sure that we have run the background thread 5 times more
@@ -140,20 +153,22 @@ public class TestKeyDeletingService {
 
   @Test(timeout = 30000)
   public void checkDeletionForEmptyKey()
-      throws IOException, TimeoutException, InterruptedException {
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
     OzoneConfiguration conf = createConfAndInitValues();
-    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
-    //failCallsFrequency = 1 , means all calls fail.
-    KeyManager keyManager =
-        new KeyManagerImpl(
-            new ScmBlockLocationTestingClient(null, null, 1),
-            metaMgr, conf, UUID.randomUUID().toString(), null);
-    keyManager.start(conf);
+    ScmBlockLocationProtocol blockClient =
+        //failCallsFrequency = 1 , means all calls fail.
+        new ScmBlockLocationTestingClient(null, null, 1);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf, blockClient, null);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+
     final int keyCount = 100;
     createAndDeleteKeys(keyManager, keyCount, 0);
     KeyDeletingService keyDeletingService =
         (KeyDeletingService) keyManager.getDeletingService();
-    keyManager.start(conf);
 
     // Since empty keys are directly deleted from db there should be no
     // pending deletion keys. Also deletedKeyCount should be zero.
@@ -205,13 +220,13 @@ public class TestKeyDeletingService {
               .setLocationInfoList(new ArrayList<>())
               .build();
       //Open, Commit and Delete the Keys in the Key Manager.
-      OpenKeySession session = keyManager.openKey(arg);
+      OpenKeySession session = writeClient.openKey(arg);
       for (int i = 0; i < numBlocks; i++) {
         arg.addLocationInfo(
-            keyManager.allocateBlock(arg, session.getId(), new ExcludeList()));
+            writeClient.allocateBlock(arg, session.getId(), new 
ExcludeList()));
       }
-      keyManager.commitKey(arg, session.getId());
-      keyManager.deleteKey(arg);
+      writeClient.commitKey(arg, session.getId());
+      writeClient.deleteKey(arg);
     }
   }
 }
\ No newline at end of file
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index b310079..c1094f7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -64,9 +64,9 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.ozone.test.GenericTestUtils;
 
 import org.apache.hadoop.util.Time;
@@ -89,7 +89,7 @@ import static org.mockito.Mockito.when;
 public class TestKeyManagerUnit {
 
   private OzoneConfiguration configuration;
-  private OmMetadataManagerImpl metadataManager;
+  private OMMetadataManager metadataManager;
   private StorageContainerLocationProtocol containerClient;
   private KeyManagerImpl keyManager;
 
@@ -97,25 +97,30 @@ public class TestKeyManagerUnit {
   private File testDir;
   private ScmBlockLocationProtocol blockClient;
 
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  
   @Before
-  public void setup() throws IOException {
+  public void setup() throws Exception {
     configuration = new OzoneConfiguration();
     testDir = GenericTestUtils.getRandomizedTestDir();
     configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         testDir.toString());
-    metadataManager = new OmMetadataManagerImpl(configuration);
     containerClient = Mockito.mock(StorageContainerLocationProtocol.class);
     blockClient = Mockito.mock(ScmBlockLocationProtocol.class);
-    keyManager = new KeyManagerImpl(
-        blockClient, containerClient, metadataManager, configuration,
-        "omtest", Mockito.mock(OzoneBlockTokenSecretManager.class));
 
+    OmTestManagers omTestManagers
+        = new OmTestManagers(configuration, blockClient, containerClient);
+    om = omTestManagers.getOzoneManager();
+    metadataManager = omTestManagers.getMetadataManager();
+    keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
     startDate = Instant.now();
   }
 
   @After
   public void cleanup() throws Exception {
-    metadataManager.stop();
+    om.stop();
     FileUtils.deleteDirectory(testDir);
   }
 
@@ -125,7 +130,7 @@ public class TestKeyManagerUnit {
     createBucket(metadataManager, "vol1", "bucket1");
 
     OmMultipartInfo omMultipartInfo =
-        initMultipartUpload(keyManager, "vol1", "bucket1", "dir/key1");
+        initMultipartUpload(writeClient, "vol1", "bucket1", "dir/key1");
 
     //WHEN
     OmMultipartUploadListParts omMultipartUploadListParts = keyManager
@@ -143,9 +148,9 @@ public class TestKeyManagerUnit {
     createBucket(metadataManager, "vol1", "bucket1");
     createBucket(metadataManager, "vol1", "bucket2");
 
-    initMultipartUpload(keyManager, "vol1", "bucket1", "dir/key1");
-    initMultipartUpload(keyManager, "vol1", "bucket1", "dir/key2");
-    initMultipartUpload(keyManager, "vol1", "bucket2", "dir/key1");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "dir/key1");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "dir/key2");
+    initMultipartUpload(writeClient, "vol1", "bucket2", "dir/key1");
 
     //WHEN
     OmMultipartUploadList omMultipartUploadList =
@@ -178,11 +183,11 @@ public class TestKeyManagerUnit {
     // Add few to cache and few to DB.
     addinitMultipartUploadToCache(volume, bucket, "dir/key1");
 
-    initMultipartUpload(keyManager, volume, bucket, "dir/key2");
+    initMultipartUpload(writeClient, volume, bucket, "dir/key2");
 
     addinitMultipartUploadToCache(volume, bucket, "dir/key3");
 
-    initMultipartUpload(keyManager, volume, bucket, "dir/key4");
+    initMultipartUpload(writeClient, volume, bucket, "dir/key4");
 
     //WHEN
     OmMultipartUploadList omMultipartUploadList =
@@ -201,12 +206,12 @@ public class TestKeyManagerUnit {
     // Same way add few to cache and few to DB.
     addinitMultipartUploadToCache(volume, bucket, "dir/ozonekey1");
 
-    initMultipartUpload(keyManager, volume, bucket, "dir/ozonekey2");
+    initMultipartUpload(writeClient, volume, bucket, "dir/ozonekey2");
 
     OmMultipartInfo omMultipartInfo3 =addinitMultipartUploadToCache(volume,
         bucket, "dir/ozonekey3");
 
-    OmMultipartInfo omMultipartInfo4 = initMultipartUpload(keyManager,
+    OmMultipartInfo omMultipartInfo4 = initMultipartUpload(writeClient,
         volume, bucket, "dir/ozonekey4");
 
     omMultipartUploadList =
@@ -258,13 +263,13 @@ public class TestKeyManagerUnit {
     createBucket(metadataManager, "vol1", "bucket1");
     createBucket(metadataManager, "vol1", "bucket2");
 
-    initMultipartUpload(keyManager, "vol1", "bucket1", "dip/key1");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "dip/key1");
 
-    initMultipartUpload(keyManager, "vol1", "bucket1", "dir/key1");
-    initMultipartUpload(keyManager, "vol1", "bucket1", "dir/key2");
-    initMultipartUpload(keyManager, "vol1", "bucket1", "key3");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "dir/key1");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "dir/key2");
+    initMultipartUpload(writeClient, "vol1", "bucket1", "key3");
 
-    initMultipartUpload(keyManager, "vol1", "bucket2", "dir/key1");
+    initMultipartUpload(writeClient, "vol1", "bucket2", "dir/key1");
 
     //WHEN
     OmMultipartUploadList omMultipartUploadList =
@@ -277,7 +282,7 @@ public class TestKeyManagerUnit {
     Assert.assertEquals("dir/key2", uploads.get(1).getKeyName());
   }
 
-  private void createBucket(OmMetadataManagerImpl omMetadataManager,
+  private void createBucket(OMMetadataManager omMetadataManager,
       String volume, String bucket)
       throws IOException {
     OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
@@ -287,10 +292,10 @@ public class TestKeyManagerUnit {
         .setIsVersionEnabled(false)
         .setAcls(new ArrayList<>())
         .build();
-    TestOMRequestUtils.addBucketToOM(metadataManager, omBucketInfo);
+    TestOMRequestUtils.addBucketToOM(omMetadataManager, omBucketInfo);
   }
 
-  private OmMultipartInfo initMultipartUpload(KeyManagerImpl omtest,
+  private OmMultipartInfo initMultipartUpload(OzoneManagerProtocol omtest,
       String volume, String bucket, String key)
       throws IOException {
     OmKeyArgs key1 = new Builder()
@@ -301,7 +306,8 @@ public class TestKeyManagerUnit {
             new RatisReplicationConfig(ReplicationFactor.THREE))
         .setAcls(new ArrayList<>())
         .build();
-    return omtest.initiateMultipartUpload(key1);
+    OmMultipartInfo omMultipartInfo = omtest.initiateMultipartUpload(key1);
+    return omMultipartInfo;
   }
 
   private OmMultipartInfo addinitMultipartUploadToCache(
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
index 0413036..2700ae5 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
@@ -29,7 +29,10 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -40,7 +43,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.UUID;
 
 /**
  * Test Key Trash Service.
@@ -57,12 +59,13 @@ public class TestTrashService {
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private KeyManager keyManager;
-  private OmMetadataManagerImpl omMetadataManager;
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
   private String volumeName;
   private String bucketName;
 
   @Before
-  public void setup() throws IOException {
+  public void setup() throws IOException, AuthenticationException {
     OzoneConfiguration configuration = new OzoneConfiguration();
 
     File folder = tempFolder.newFolder();
@@ -72,24 +75,27 @@ public class TestTrashService {
     System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
     ServerUtils.setOzoneMetaDirPath(configuration, folder.toString());
 
-    omMetadataManager = new OmMetadataManagerImpl(configuration);
-
-    keyManager = new KeyManagerImpl(
-        new ScmBlockLocationTestingClient(null, null, 0),
-        omMetadataManager, configuration, UUID.randomUUID().toString(), null);
-    keyManager.start(configuration);
-
+    OmTestManagers omTestManagers
+        = new OmTestManagers(configuration);
+    keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
     volumeName = "volume";
     bucketName = "bucket";
   }
 
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
   @Test
   public void testRecoverTrash() throws IOException {
     String keyName = "testKey";
     String destinationBucket = "destBucket";
     createAndDeleteKey(keyName);
 
-    boolean recoverOperation = omMetadataManager
+    boolean recoverOperation = keyManager.getMetadataManager()
         .recoverTrash(volumeName, bucketName, keyName, destinationBucket);
     Assert.assertTrue(recoverOperation);
   }
@@ -120,9 +126,9 @@ public class TestTrashService {
         .build();
 
     /* Create and delete key in the Key Manager. */
-    OpenKeySession session = keyManager.openKey(keyArgs);
-    keyManager.commitKey(keyArgs, session.getId());
-    keyManager.deleteKey(keyArgs);
+    OpenKeySession session = writeClient.openKey(keyArgs);
+    writeClient.commitKey(keyArgs, session.getId());
+    writeClient.deleteKey(keyArgs);
   }
 
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
index 46774fe..fc60de3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
@@ -21,24 +21,22 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.om.BucketManagerImpl;
-import org.apache.hadoop.ozone.om.IOzoneAcl;
-import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.PrefixManager;
-import org.apache.hadoop.ozone.om.PrefixManagerImpl;
-import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -78,8 +76,6 @@ import static 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
 import static 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Test class for {@link OzoneNativeAuthorizer}.
@@ -96,9 +92,10 @@ public class TestOzoneNativeAuthorizer {
   private ACLType parentDirGroupAcl;
   private boolean expectedAclResult;
 
-  private static KeyManagerImpl keyManager;
-  private static VolumeManagerImpl volumeManager;
-  private static BucketManagerImpl bucketManager;
+  private static OzoneManagerProtocol writeClient;
+  private static KeyManager keyManager;
+  private static VolumeManager volumeManager;
+  private static BucketManager bucketManager;
   private static PrefixManager prefixManager;
   private static OMMetadataManager metadataManager;
   private static OzoneNativeAuthorizer nativeAuthorizer;
@@ -150,14 +147,14 @@ public class TestOzoneNativeAuthorizer {
     ozConfig.set(OZONE_METADATA_DIRS, dir.toString());
     ozConfig.set(OZONE_ADMINISTRATORS, "om");
 
-    metadataManager = new OmMetadataManagerImpl(ozConfig);
-    volumeManager = new VolumeManagerImpl(metadataManager, ozConfig);
-    bucketManager = new BucketManagerImpl(metadataManager);
-    prefixManager = new PrefixManagerImpl(metadataManager, false);
-
-    keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
-        metadataManager, ozConfig, "om1", null);
-
+    OmTestManagers omTestManagers =
+        new OmTestManagers(ozConfig);
+    metadataManager = omTestManagers.getMetadataManager();
+    volumeManager = omTestManagers.getVolumeManager();
+    bucketManager = omTestManagers.getBucketManager();
+    prefixManager = omTestManagers.getPrefixManager();
+    keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
         Collections.singletonList("om"));
@@ -181,14 +178,14 @@ public class TestOzoneNativeAuthorizer {
         .build();
 
     if (keyName.split(OZONE_URI_DELIMITER).length > 1) {
-      keyManager.createDirectory(keyArgs);
+      writeClient.createDirectory(keyArgs);
       key = key + OZONE_URI_DELIMITER;
     } else {
-      OpenKeySession keySession = keyManager.createFile(keyArgs, true, false);
+      OpenKeySession keySession = writeClient.createFile(keyArgs, true, false);
       keyArgs.setLocationInfoList(
           keySession.getKeyInfo().getLatestVersionLocations()
               .getLocationList());
-      keyManager.commitKey(keyArgs, keySession.getId());
+      writeClient.commitKey(keyArgs, keySession.getId());
     }
 
     keyObj = new OzoneObjInfo.Builder()
@@ -232,10 +229,10 @@ public class TestOzoneNativeAuthorizer {
   @Test
   public void testCheckAccessForVolume() throws Exception {
     expectedAclResult = true;
-    resetAclsAndValidateAccess(volObj, USER, volumeManager);
-    resetAclsAndValidateAccess(volObj, GROUP, volumeManager);
-    resetAclsAndValidateAccess(volObj, WORLD, volumeManager);
-    resetAclsAndValidateAccess(volObj, ANONYMOUS, volumeManager);
+    resetAclsAndValidateAccess(volObj, USER, writeClient);
+    resetAclsAndValidateAccess(volObj, GROUP, writeClient);
+    resetAclsAndValidateAccess(volObj, WORLD, writeClient);
+    resetAclsAndValidateAccess(volObj, ANONYMOUS, writeClient);
   }
 
   @Test
@@ -251,10 +248,10 @@ public class TestOzoneNativeAuthorizer {
     setVolumeAcl(Arrays.asList(userAcl, groupAcl));
 
 
-    resetAclsAndValidateAccess(buckObj, USER, bucketManager);
-    resetAclsAndValidateAccess(buckObj, GROUP, bucketManager);
-    resetAclsAndValidateAccess(buckObj, WORLD, bucketManager);
-    resetAclsAndValidateAccess(buckObj, ANONYMOUS, bucketManager);
+    resetAclsAndValidateAccess(buckObj, USER, writeClient);
+    resetAclsAndValidateAccess(buckObj, GROUP, writeClient);
+    resetAclsAndValidateAccess(buckObj, WORLD, writeClient);
+    resetAclsAndValidateAccess(buckObj, ANONYMOUS, writeClient);
   }
 
   @Test
@@ -269,10 +266,10 @@ public class TestOzoneNativeAuthorizer {
     setVolumeAcl(Arrays.asList(userAcl, groupAcl));
     setBucketAcl(Arrays.asList(userAcl, groupAcl));
 
-    resetAclsAndValidateAccess(keyObj, USER, keyManager);
-    resetAclsAndValidateAccess(keyObj, GROUP, keyManager);
-    resetAclsAndValidateAccess(keyObj, WORLD, keyManager);
-    resetAclsAndValidateAccess(keyObj, ANONYMOUS, keyManager);
+    resetAclsAndValidateAccess(keyObj, USER, writeClient);
+    resetAclsAndValidateAccess(keyObj, GROUP, writeClient);
+    resetAclsAndValidateAccess(keyObj, WORLD, writeClient);
+    resetAclsAndValidateAccess(keyObj, ANONYMOUS, writeClient);
   }
 
   @Test
@@ -297,10 +294,10 @@ public class TestOzoneNativeAuthorizer {
     setBucketAcl(Arrays.asList(userAcl, groupAcl));
 
 
-    resetAclsAndValidateAccess(prefixObj, USER, prefixManager);
-    resetAclsAndValidateAccess(prefixObj, GROUP, prefixManager);
-    resetAclsAndValidateAccess(prefixObj, WORLD, prefixManager);
-    resetAclsAndValidateAccess(prefixObj, ANONYMOUS, prefixManager);
+    resetAclsAndValidateAccess(prefixObj, USER, writeClient);
+    resetAclsAndValidateAccess(prefixObj, GROUP, writeClient);
+    resetAclsAndValidateAccess(prefixObj, WORLD, writeClient);
+    resetAclsAndValidateAccess(prefixObj, ANONYMOUS, writeClient);
   }
 
 
@@ -347,7 +344,7 @@ public class TestOzoneNativeAuthorizer {
   }
 
   private void resetAclsAndValidateAccess(OzoneObj obj,
-      ACLIdentityType accessType, IOzoneAcl aclImplementor)
+      ACLIdentityType accessType, OzoneManagerProtocol aclImplementor)
       throws IOException {
     List<OzoneAcl> acls;
     String user = testUgi.getUserName();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
index 48ca158..8702302 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
@@ -23,17 +23,15 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.om.BucketManagerImpl;
-import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.PrefixManager;
-import org.apache.hadoop.ozone.om.PrefixManagerImpl;
-import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -41,9 +39,11 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.security.UserGroupInformation;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -73,24 +73,24 @@ import static 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
 import static 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
-import static org.mockito.Mockito.mock;
 
 /**
  * Test parent acl requirements when accessing children with native authorizer.
  */
 public class TestParentAcl {
   private static OzoneConfiguration ozConfig;
-  private static KeyManagerImpl keyManager;
-  private static VolumeManagerImpl volumeManager;
-  private static BucketManagerImpl bucketManager;
+  private static KeyManager keyManager;
+  private static VolumeManager volumeManager;
+  private static BucketManager bucketManager;
   private static PrefixManager prefixManager;
   private static OMMetadataManager metadataManager;
   private static OzoneNativeAuthorizer nativeAuthorizer;
   private static UserGroupInformation adminUgi;
   private static UserGroupInformation testUgi, testUgi1;
+  private static OzoneManagerProtocol writeClient;
 
   @BeforeClass
-  public static void setup() throws IOException {
+  public static void setup() throws IOException, AuthenticationException {
     ozConfig = new OzoneConfiguration();
     ozConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
         OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
@@ -98,14 +98,14 @@ public class TestParentAcl {
     ozConfig.set(OZONE_METADATA_DIRS, dir.toString());
     ozConfig.set(OZONE_ADMINISTRATORS, "om");
 
-    metadataManager = new OmMetadataManagerImpl(ozConfig);
-    volumeManager = new VolumeManagerImpl(metadataManager, ozConfig);
-    bucketManager = new BucketManagerImpl(metadataManager);
-    prefixManager = new PrefixManagerImpl(metadataManager, false);
-
-    keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
-        metadataManager, ozConfig, "om1", null);
-
+    OmTestManagers omTestManagers =
+        new OmTestManagers(ozConfig);
+    metadataManager = omTestManagers.getMetadataManager();
+    volumeManager = omTestManagers.getVolumeManager();
+    bucketManager = omTestManagers.getBucketManager();
+    prefixManager = omTestManagers.getPrefixManager();
+    keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
         Collections.singletonList("om"));
@@ -396,13 +396,13 @@ public class TestParentAcl {
 
 
     if (keyName.split(OZONE_URI_DELIMITER).length > 1) {
-      keyManager.createDirectory(keyArgs);
+      writeClient.createDirectory(keyArgs);
     } else {
-      OpenKeySession keySession = keyManager.createFile(keyArgs, true, false);
+      OpenKeySession keySession = writeClient.createFile(keyArgs, true, false);
       keyArgs.setLocationInfoList(
           keySession.getKeyInfo().getLatestVersionLocations()
               .getLocationList());
-      keyManager.commitKey(keyArgs, keySession.getId());
+      writeClient.commitKey(keyArgs, keySession.getId());
     }
 
     return new OzoneObjInfo.Builder()
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
index 06516e9..c54391b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
@@ -21,21 +21,21 @@ package org.apache.hadoop.ozone.security.acl;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.ozone.om.BucketManagerImpl;
-import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.PrefixManager;
-import org.apache.hadoop.ozone.om.PrefixManagerImpl;
-import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -54,7 +54,6 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
-import static org.mockito.Mockito.mock;
 
 
 /**
@@ -64,28 +63,30 @@ public class TestVolumeOwner {
 
   private static OzoneConfiguration ozoneConfig;
   private static OzoneNativeAuthorizer nativeAuthorizer;
-  private static KeyManagerImpl keyManager;
-  private static VolumeManagerImpl volumeManager;
-  private static BucketManagerImpl bucketManager;
+  private static KeyManager keyManager;
+  private static VolumeManager volumeManager;
+  private static BucketManager bucketManager;
   private static PrefixManager prefixManager;
   private static OMMetadataManager metadataManager;
   private static UserGroupInformation testUgi;
+  private static OzoneManagerProtocol writeClient;
 
   @BeforeClass
-  public static void setup() throws IOException {
+  public static void setup() throws IOException, AuthenticationException {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
         OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
     File dir = GenericTestUtils.getRandomizedTestDir();
     ozoneConfig.set(OZONE_METADATA_DIRS, dir.toString());
 
-    metadataManager = new OmMetadataManagerImpl(ozoneConfig);
-    volumeManager = new VolumeManagerImpl(metadataManager, ozoneConfig);
-    bucketManager = new BucketManagerImpl(metadataManager);
-    keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
-        metadataManager, ozoneConfig, "om1", null);
-    prefixManager = new PrefixManagerImpl(metadataManager, false);
-
+    OmTestManagers omTestManagers =
+        new OmTestManagers(ozoneConfig);
+    metadataManager = omTestManagers.getMetadataManager();
+    volumeManager = omTestManagers.getVolumeManager();
+    bucketManager = omTestManagers.getBucketManager();
+    keyManager = omTestManagers.getKeyManager();
+    prefixManager = omTestManagers.getPrefixManager();
+    writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
         Collections.singletonList("om"));
@@ -144,12 +145,12 @@ public class TestVolumeOwner {
                 testUgi.getUserName(), testUgi.getGroupNames(), NONE, NONE));
           }
           OmKeyArgs keyArgs = keyArgsBuilder.build();
-          OpenKeySession keySession = keyManager.createFile(keyArgs, true,
+          OpenKeySession keySession = writeClient.createFile(keyArgs, true,
               false);
           keyArgs.setLocationInfoList(
               keySession.getKeyInfo().getLatestVersionLocations()
                   .getLocationList());
-          keyManager.commitKey(keyArgs, keySession.getId());
+          writeClient.commitKey(keyArgs, keySession.getId());
         }
       }
     }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
deleted file mode 100644
index 6412392..0000000
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-package org.apache.hadoop.ozone.genesis;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.infra.Blackhole;
-
-/**
- * Benchmarks OzoneManager.
- */
-@State(Scope.Thread)
-public class BenchMarkOzoneManager {
-
-  private static String testDir;
-  private static OzoneManager om;
-  private static StorageContainerManager scm;
-  private static ReentrantLock lock = new ReentrantLock();
-  private static String volumeName = UUID.randomUUID().toString();
-  private static String bucketName = UUID.randomUUID().toString();
-  private static List<String> keyNames = new ArrayList<>();
-  private static List<Long> clientIDs = new ArrayList<>();
-
-  private static int numPipelines = 1;
-  private static int numContainersPerPipeline = 3;
-
-  @Setup(Level.Trial)
-  public static void initialize()
-      throws Exception {
-    try {
-      lock.lock();
-      if (scm == null) {
-        OzoneConfiguration conf = new OzoneConfiguration();
-        testDir = GenesisUtil.getTempPath()
-            .resolve(RandomStringUtils.randomNumeric(7)).toString();
-        conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir);
-
-        GenesisUtil.configureSCM(conf, 10);
-        GenesisUtil.configureOM(conf, 20);
-        conf.setInt(OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
-            numContainersPerPipeline);
-        GenesisUtil.addPipelines(ReplicationFactor.THREE, numPipelines, conf);
-
-        scm = GenesisUtil.getScm(conf, new SCMConfigurator());
-        scm.start();
-        om = GenesisUtil.getOm(conf);
-        om.start();
-
-        // prepare SCM
-        PipelineManager pipelineManager = scm.getPipelineManager();
-        for (Pipeline pipeline : pipelineManager
-            .getPipelines(
-                new RatisReplicationConfig(ReplicationFactor.THREE))) {
-          pipelineManager.openPipeline(pipeline.getId());
-        }
-        scm.getEventQueue().fireEvent(SCMEvents.SAFE_MODE_STATUS,
-            new SCMSafeModeManager.SafeModeStatus(false, false));
-        Thread.sleep(1000);
-
-        // prepare OM
-        om.createVolume(new OmVolumeArgs.Builder().setVolume(volumeName)
-            .setAdminName(UserGroupInformation.getLoginUser().getUserName())
-            .setOwnerName(UserGroupInformation.getLoginUser().getUserName())
-            .build());
-        om.createBucket(new OmBucketInfo.Builder().setBucketName(bucketName)
-            .setVolumeName(volumeName).build());
-        createKeys(100000);
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private static void createKeys(int numKeys) throws IOException {
-    for (int i = 0; i < numKeys; i++) {
-      String key = UUID.randomUUID().toString();
-      OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
-          .setVolumeName(volumeName)
-          .setBucketName(bucketName)
-          .setKeyName(key)
-          .setDataSize(0)
-          .setReplicationConfig(
-              new RatisReplicationConfig(ReplicationFactor.THREE))
-          .build();
-      OpenKeySession keySession = om.getKeyManager().openKey(omKeyArgs);
-      long clientID = keySession.getId();
-      keyNames.add(key);
-      clientIDs.add(clientID);
-    }
-  }
-
-  @TearDown(Level.Trial)
-  public static void tearDown() {
-    try {
-      lock.lock();
-      if (scm != null) {
-        scm.stop();
-        scm.join();
-        scm = null;
-        om.stop();
-        om.join();
-        om = null;
-        FileUtil.fullyDelete(new File(testDir));
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Threads(4)
-  @Benchmark
-  public void allocateBlockBenchMark(BenchMarkOzoneManager state,
-      Blackhole bh) throws IOException {
-    int index = (int) (Math.random() * keyNames.size());
-    String key = keyNames.get(index);
-    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(key)
-        .setDataSize(50)
-        .setReplicationConfig(
-            new RatisReplicationConfig(ReplicationFactor.THREE))
-        .build();
-    state.om.allocateBlock(omKeyArgs, clientIDs.get(index), new ExcludeList());
-  }
-
-  @Threads(4)
-  @Benchmark
-  public void createAndCommitKeyBenchMark(BenchMarkOzoneManager state,
-      Blackhole bh) throws IOException {
-    String key = UUID.randomUUID().toString();
-    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(key)
-        .setDataSize(50)
-        .setReplicationConfig(
-            new RatisReplicationConfig(ReplicationFactor.THREE))
-        .build();
-    OpenKeySession openKeySession = state.om.openKey(omKeyArgs);
-    state.om.allocateBlock(omKeyArgs, openKeySession.getId(),
-        new ExcludeList());
-  }
-}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/Genesis.java 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/Genesis.java
index 605cc42..77da882 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/Genesis.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/Genesis.java
@@ -48,10 +48,9 @@ public final class Genesis {
       split = ",", description =
       "Option used for specifying benchmarks to run.\n"
           + "Ex. ozone genesis -benchmark BenchMarkContainerStateMap,"
-          + "BenchMarkOMKeyAllocation.\n"
           + "Possible benchmarks which can be used are "
           + "{BenchMarkContainerStateMap, "
-          + "BenchMarkOzoneManager, BenchMarkOMClient, "
+          + "BenchMarkOMClient, "
           + "BenchMarkSCM, BenchMarkMetadataStoreReads, "
           + "BenchMarkMetadataStoreWrites, BenchMarkDatanodeDispatcher, "
           + "BenchMarkRocksDbStore, BenchMarkCRCStreaming, BenchMarkCRCBatch}")

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to