This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this 
push:
     new eafd2ccec0 HDDS-6964. [Snapshot] Split out shared "Path based access" 
code from OM. (#3653)
eafd2ccec0 is described below

commit eafd2ccec01ffcb5b9966fc957d5e6b1ce4b3ddc
Author: GeorgeJahad <[email protected]>
AuthorDate: Thu Aug 25 16:07:11 2022 -0700

    HDDS-6964. [Snapshot] Split out shared "Path based access" code from OM. 
(#3653)
---
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  42 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   4 +
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 134 ------
 .../java/org/apache/hadoop/ozone/om/IOzoneAcl.java |  31 --
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 209 +--------
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  13 +-
 .../apache/hadoop/ozone/om/OmMetadataReader.java   | 495 +++++++++++++++++++++
 .../hadoop/ozone/om/OmMetadataReaderMetrics.java   |  45 ++
 .../org/apache/hadoop/ozone/om/OzoneAclUtils.java  |  16 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 373 ++--------------
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  | 104 -----
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  | 137 ------
 .../hadoop/ozone/om/request/OMClientRequest.java   |  17 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   2 +-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   4 +
 15 files changed, 678 insertions(+), 948 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 3eeb3e93dd..fcb1047f36 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -611,6 +611,10 @@ public class TestKeyManagerImpl {
       }
     }
     Assert.assertEquals(2, matchEntries);
+    // cleanup
+    writeClient.removeAcl(ozPrefix1, ozAcl1);
+    writeClient.removeAcl(ozPrefix1, ozAcl2);
+    writeClient.removeAcl(ozPrefix1, ozAcl3);
   }
 
   @Test
@@ -635,7 +639,7 @@ public class TestKeyManagerImpl {
     // add acl with invalid prefix name
     exception.expect(OMException.class);
     exception.expectMessage("Invalid prefix name");
-    prefixManager.addAcl(ozInvalidPrefix, ozAcl1);
+    writeClient.addAcl(ozInvalidPrefix, ozAcl1);
 
     OzoneObj ozPrefix1 = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
@@ -645,27 +649,27 @@ public class TestKeyManagerImpl {
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
 
-    prefixManager.addAcl(ozPrefix1, ozAcl1);
-    List<OzoneAcl> ozAclGet = prefixManager.getAcl(ozPrefix1);
+    writeClient.addAcl(ozPrefix1, ozAcl1);
+    List<OzoneAcl> ozAclGet = writeClient.getAcl(ozPrefix1);
     Assert.assertEquals(1, ozAclGet.size());
     Assert.assertEquals(ozAcl1, ozAclGet.get(0));
 
     // get acl with invalid prefix name
     exception.expect(OMException.class);
     exception.expectMessage("Invalid prefix name");
-    prefixManager.getAcl(ozInvalidPrefix);
+    writeClient.getAcl(ozInvalidPrefix);
 
     // set acl with invalid prefix name
     List<OzoneAcl> ozoneAcls = new ArrayList<OzoneAcl>();
     ozoneAcls.add(ozAcl1);
     exception.expect(OMException.class);
     exception.expectMessage("Invalid prefix name");
-    prefixManager.setAcl(ozInvalidPrefix, ozoneAcls);
+    writeClient.setAcl(ozInvalidPrefix, ozoneAcls);
 
     // remove acl with invalid prefix name
     exception.expect(OMException.class);
     exception.expectMessage("Invalid prefix name");
-    prefixManager.removeAcl(ozInvalidPrefix, ozAcl1);
+    writeClient.removeAcl(ozInvalidPrefix, ozAcl1);
   }
 
   @Test
@@ -686,7 +690,7 @@ public class TestKeyManagerImpl {
 
     OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
         ACLType.READ, ACCESS);
-    prefixManager.addAcl(ozPrefix1, ozAcl1);
+    writeClient.addAcl(ozPrefix1, ozAcl1);
 
     OzoneObj ozFile1 = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
@@ -717,6 +721,8 @@ public class TestKeyManagerImpl {
     for (int i = 0; i < 6; i++) {
       Assert.assertEquals(null, prefixInfos.get(i));
     }
+    // cleanup
+    writeClient.removeAcl(ozPrefix1, ozAcl1);
   }
 
   @Test
@@ -1009,17 +1015,17 @@ public class TestKeyManagerImpl {
     String keyNameDir1 = "dir1";
     OmKeyArgs keyArgsDir1 =
         createBuilder().setKeyName(keyNameDir1).build();
-    writeClient.createDirectory(keyArgsDir1);
+    addDirectory(keyArgsDir1);
 
     String keyNameDir1Subdir1 = "dir1" + OZONE_URI_DELIMITER + "subdir1";
     OmKeyArgs keyArgsDir1Subdir1 =
         createBuilder().setKeyName(keyNameDir1Subdir1).build();
-    writeClient.createDirectory(keyArgsDir1Subdir1);
+    addDirectory(keyArgsDir1Subdir1);
 
     String keyNameDir2 = "dir2";
     OmKeyArgs keyArgsDir2 =
         createBuilder().setKeyName(keyNameDir2).build();
-    writeClient.createDirectory(keyArgsDir2);
+    addDirectory(keyArgsDir2);
 
     OmKeyArgs rootDirArgs = createKeyArgs("");
     // Test listStatus with recursive=false, should only have dirs under root
@@ -1534,4 +1540,20 @@ public class TestKeyManagerImpl {
   private static BucketLayout getDefaultBucketLayout() {
     return BucketLayout.DEFAULT;
   }
+
+  private static void addDirectory(OmKeyArgs keyArgs) throws Exception {
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName() + "/")
+        .setFileName(OzoneFSUtils.getFileName(keyArgs.getKeyName()))
+        .setOmKeyLocationInfos(null)
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setDataSize(0)
+        .setReplicationConfig(keyArgs.getReplicationConfig())
+        .setFileEncryptionInfo(null).build();
+    OMRequestTestUtils.addKeyToTable(false, false, omKeyInfo,
+        1000L, 0L, metadataManager);
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index c9babb8922..0a8570f763 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -307,9 +307,13 @@ public class TestOmMetrics {
         any(), any(), any(), any(), anyInt());
     Mockito.doThrow(exception).when(mockKm).listTrash(
         any(), any(), any(), any(), anyInt());
+    OmMetadataReader omMetadataReader = ozoneManager.getOmMetadataReader();
     HddsWhiteboxTestUtils.setInternalState(
         ozoneManager, "keyManager", mockKm);
 
+    HddsWhiteboxTestUtils.setInternalState(
+        omMetadataReader, "keyManager", mockKm);
+
     // inject exception to test for Failure Metrics on the write path
     mockWritePathExceptions(OmBucketInfo.class);
     keyArgs = createKeyArgs(volumeName, bucketName,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index 19687aaf14..5fcf8b0839 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -161,139 +160,6 @@ public class BucketManagerImpl implements BucketManager {
 
   }
 
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   *
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl to be added.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acl);
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "BucketManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    boolean changed = false;
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
-      OmBucketInfo bucketInfo =
-          metadataManager.getBucketTable().get(dbBucketKey);
-      if (bucketInfo == null) {
-        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
-        throw new OMException("Bucket " + bucket + " is not found",
-            BUCKET_NOT_FOUND);
-      }
-
-      changed = bucketInfo.addAcl(acl);
-      if (changed) {
-        metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Add acl operation failed for bucket:{}/{} acl:{}",
-            volume, bucket, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-
-    return changed;
-  }
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   *
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acl);
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "BucketManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    boolean removed = false;
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
-      OmBucketInfo bucketInfo =
-          metadataManager.getBucketTable().get(dbBucketKey);
-      if (bucketInfo == null) {
-        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
-        throw new OMException("Bucket " + bucket + " is not found",
-            BUCKET_NOT_FOUND);
-      }
-      removed = bucketInfo.removeAcl(acl);
-      if (removed) {
-        metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Remove acl operation failed for bucket:{}/{} acl:{}",
-            volume, bucket, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-    return removed;
-  }
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for given
-   * object to list of ACLs provided in argument.
-   *
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acls);
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "BucketManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
-      OmBucketInfo bucketInfo =
-          metadataManager.getBucketTable().get(dbBucketKey);
-      if (bucketInfo == null) {
-        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
-        throw new OMException("Bucket " + bucket + " is not found",
-            BUCKET_NOT_FOUND);
-      }
-      bucketInfo.setAcls(acls);
-      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Set acl operation failed for bucket:{}/{} acl:{}",
-            volume, bucket, StringUtils.join(",", acls), ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-    return true;
-  }
 
   /**
    * Returns list of ACLs for given Ozone object.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java
index d81728183d..53c3838ea4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java
@@ -28,37 +28,6 @@ import java.util.List;
  * Interface for Ozone Acl management.
  */
 public interface IOzoneAcl {
-
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl to be added.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for
-   * given object to list of ACLs provided in argument.
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
-
   /**
    * Returns list of ACLs for given Ozone object.
    * @param obj Ozone object.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 3ff8029c4e..d901336223 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -52,8 +52,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
@@ -173,23 +171,6 @@ public class KeyManagerImpl implements KeyManager {
 
   private BackgroundService openKeyCleanupService;
 
-  @VisibleForTesting
-  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
-      OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
-      OzoneBlockTokenSecretManager secretManager) {
-    this(null, new ScmClient(scmBlockClient, null), metadataManager,
-        conf, omId, secretManager, null, null);
-  }
-
-  @VisibleForTesting
-  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
-      StorageContainerLocationProtocol scmContainerClient,
-      OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
-      OzoneBlockTokenSecretManager secretManager) {
-    this(null, new ScmClient(scmBlockClient, scmContainerClient),
-        metadataManager, conf, omId, secretManager, null, null);
-  }
-
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OzoneConfiguration conf, String omId) {
     this (om, scmClient, om.getMetadataManager(), conf, omId,
@@ -693,13 +674,12 @@ public class KeyManagerImpl implements KeyManager {
     boolean isTruncated = false;
     int nextPartNumberMarker = 0;
     BucketLayout bucketLayout = BucketLayout.DEFAULT;
-    if (ozoneManager != null) {
-      String buckKey = ozoneManager.getMetadataManager()
-          .getBucketKey(volumeName, bucketName);
-      OmBucketInfo buckInfo =
-          ozoneManager.getMetadataManager().getBucketTable().get(buckKey);
-      bucketLayout = buckInfo.getBucketLayout();
-    }
+
+    String buckKey = metadataManager.
+          getBucketKey(volumeName, bucketName);
+    OmBucketInfo buckInfo =
+          metadataManager.getBucketTable().get(buckKey);
+    bucketLayout = buckInfo.getBucketLayout();
 
     metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
         bucketName);
@@ -828,7 +808,7 @@ public class KeyManagerImpl implements KeyManager {
 
   private String getMultipartOpenKeyFSO(String volumeName, String bucketName,
       String keyName, String uploadID) throws IOException {
-    OMMetadataManager metaMgr = ozoneManager.getMetadataManager();
+    OMMetadataManager metaMgr = metadataManager;
     String fileName = OzoneFSUtils.getFileName(keyName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
     final long volumeId = metaMgr.getVolumeId(volumeName);
@@ -843,152 +823,6 @@ public class KeyManagerImpl implements KeyManager {
     return multipartKey;
   }
 
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   *
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl to be added.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    validateOzoneObj(obj);
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    String keyName = obj.getKeyName();
-    boolean changed = false;
-
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      OMFileRequest.validateBucket(metadataManager, volume, bucket);
-      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
-      BucketLayout bucketLayout =
-          getBucketLayout(metadataManager, volume, bucket);
-      OmKeyInfo keyInfo = metadataManager
-          .getKeyTable(bucketLayout)
-          .get(objectKey);
-      if (keyInfo == null) {
-        throw new OMException("Key not found. Key:" + objectKey, 
KEY_NOT_FOUND);
-      }
-
-      if (keyInfo.getAcls() == null) {
-        keyInfo.setAcls(new ArrayList<>());
-      }
-      changed = keyInfo.addAcl(acl);
-      if (changed) {
-        metadataManager
-            .getKeyTable(getBucketLayout(metadataManager, volume, bucket))
-            .put(objectKey, keyInfo);
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Add acl operation failed for key:{}/{}/{}", volume,
-            bucket, keyName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-    return changed;
-  }
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   *
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    validateOzoneObj(obj);
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    String keyName = obj.getKeyName();
-    boolean changed = false;
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      OMFileRequest.validateBucket(metadataManager, volume, bucket);
-      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
-      BucketLayout bucketLayout =
-          getBucketLayout(metadataManager, volume, bucket);
-      OmKeyInfo keyInfo = metadataManager
-          .getKeyTable(bucketLayout)
-          .get(objectKey);
-      if (keyInfo == null) {
-        throw new OMException("Key not found. Key:" + objectKey, 
KEY_NOT_FOUND);
-      }
-
-      changed = keyInfo.removeAcl(acl);
-      if (changed) {
-        metadataManager
-            .getKeyTable(getBucketLayout(metadataManager, volume, bucket))
-            .put(objectKey, keyInfo);
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Remove acl operation failed for key:{}/{}/{}", volume,
-            bucket, keyName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-    return changed;
-  }
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for given
-   * object to list of ACLs provided in argument.
-   *
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
-    validateOzoneObj(obj);
-    String volume = obj.getVolumeName();
-    String bucket = obj.getBucketName();
-    String keyName = obj.getKeyName();
-    boolean changed = false;
-
-    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
-    try {
-      OMFileRequest.validateBucket(metadataManager, volume, bucket);
-      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
-      BucketLayout bucketLayout =
-          getBucketLayout(metadataManager, volume, bucket);
-      OmKeyInfo keyInfo = metadataManager
-          .getKeyTable(bucketLayout)
-          .get(objectKey);
-      if (keyInfo == null) {
-        throw new OMException("Key not found. Key:" + objectKey, 
KEY_NOT_FOUND);
-      }
-
-      changed = keyInfo.setAcls(acls);
-
-      if (changed) {
-        metadataManager
-            .getKeyTable(getBucketLayout(metadataManager, volume, bucket))
-            .put(objectKey, keyInfo);
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Set acl operation failed for key:{}/{}/{}", volume,
-            bucket, keyName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
-    }
-    return changed;
-  }
-
   /**
    * Returns list of ACLs for given Ozone object.
    *
@@ -1053,17 +887,15 @@ public class KeyManagerImpl implements KeyManager {
         .build();
 
     BucketLayout bucketLayout = BucketLayout.DEFAULT;
-    if (ozoneManager != null) {
-      String buckKey =
-          ozoneManager.getMetadataManager().getBucketKey(volume, bucket);
-      OmBucketInfo buckInfo = null;
-      try {
-        buckInfo =
-            ozoneManager.getMetadataManager().getBucketTable().get(buckKey);
-        bucketLayout = buckInfo.getBucketLayout();
-      } catch (IOException e) {
-        LOG.error("Failed to get bucket for the key: " + buckKey, e);
-      }
+    String buckKey =
+        metadataManager.getBucketKey(volume, bucket);
+    OmBucketInfo buckInfo = null;
+    try {
+      buckInfo =
+          metadataManager.getBucketTable().get(buckKey);
+      bucketLayout = buckInfo.getBucketLayout();
+    } catch (IOException e) {
+      LOG.error("Failed to get bucket for the key: " + buckKey, e);
     }
 
     metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volume, bucket);
@@ -2534,15 +2366,10 @@ public class KeyManagerImpl implements KeyManager {
 
   public boolean isBucketFSOptimized(String volName, String buckName)
       throws IOException {
-    // This will never be null in reality but can be null in unit test cases.
-    // Added safer check for unit testcases.
-    if (ozoneManager == null) {
-      return false;
-    }
     String buckKey =
-        ozoneManager.getMetadataManager().getBucketKey(volName, buckName);
+        metadataManager.getBucketKey(volName, buckName);
     OmBucketInfo buckInfo =
-        ozoneManager.getMetadataManager().getBucketTable().get(buckKey);
+        metadataManager.getBucketTable().get(buckKey);
     if (buckInfo != null) {
       return buckInfo.getBucketLayout().isFileSystemOptimized();
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 2884f44cf0..d700b2a620 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
  */
 @InterfaceAudience.Private
 @Metrics(about = "Ozone Manager Metrics", context = "dfs")
-public class OMMetrics {
+public class OMMetrics implements OmMetadataReaderMetrics {
   private static final String SOURCE_NAME =
       OMMetrics.class.getSimpleName();
 
@@ -373,6 +373,7 @@ public class OMMetrics {
     numBucketLists.incr();
   }
 
+  @Override
   public void incNumKeyLists() {
     numKeyOps.incr();
     numKeyLists.incr();
@@ -533,12 +534,14 @@ public class OMMetrics {
     numTenantTenantUserLists.incr();
   }
 
+  @Override
   public void incNumGetFileStatus() {
     numKeyOps.incr();
     numFSOps.incr();
     numGetFileStatus.incr();
   }
 
+  @Override
   public void incNumGetFileStatusFails() {
     numGetFileStatusFails.incr();
   }
@@ -563,22 +566,26 @@ public class OMMetrics {
     numCreateFileFails.incr();
   }
 
+  @Override
   public void incNumLookupFile() {
     numKeyOps.incr();
     numFSOps.incr();
     numLookupFile.incr();
   }
 
+  @Override
   public void incNumLookupFileFails() {
     numLookupFileFails.incr();
   }
 
+  @Override
   public void incNumListStatus() {
     numKeyOps.incr();
     numFSOps.incr();
     numListStatus.incr();
   }
 
+  @Override
   public void incNumListStatusFails() {
     numListStatusFails.incr();
   }
@@ -636,11 +643,13 @@ public class OMMetrics {
     numKeyAllocateFails.incr();
   }
 
+  @Override
   public void incNumKeyLookups() {
     numKeyOps.incr();
     numKeyLookup.incr();
   }
 
+  @Override
   public void incNumKeyLookupFails() {
     numKeyLookupFails.incr();
   }
@@ -684,6 +693,7 @@ public class OMMetrics {
     numBucketListFails.incr();
   }
 
+  @Override
   public void incNumKeyListFails() {
     numKeyListFails.incr();
   }
@@ -724,6 +734,7 @@ public class OMMetrics {
     numSetAcl.incr();
   }
 
+  @Override
   public void incNumGetAcl() {
     numGetAcl.incr();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
new file mode 100644
index 0000000000..8f541a578b
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.om.KeyManagerImpl.getRemoteUser;
+import static org.apache.hadoop.ozone.om.OzoneManager.getS3Auth;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
+import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+
+/**
+ * OM Metadata Reading class for the OM and Snapshot managers.
+ *
+ * This abstraction manages all the metadata key/acl reading
+ * from a rocksDb instance, for both the OM and OM snapshots.
+ */
+public class OmMetadataReader implements Auditor {
+  private final KeyManager keyManager;
+  private final PrefixManager prefixManager;
+  private final VolumeManager volumeManager;
+  private final BucketManager bucketManager;
+  private final OMMetadataManager metadataManager;
+  private final OzoneManager ozoneManager;
+  private final boolean isAclEnabled;
+  private final IAccessAuthorizer accessAuthorizer;
+  private final boolean isNativeAuthorizerEnabled;
+  private final OmMetadataReaderMetrics metrics;
+  private final Logger log;
+  private final AuditLogger audit;
+
+  public OmMetadataReader(KeyManager keyManager,
+                   PrefixManager prefixManager,
+                   OMMetadataManager metadataManager,
+                   OzoneManager ozoneManager,
+                   Logger log,
+                   AuditLogger audit,
+                   OmMetadataReaderMetrics omMetadataReaderMetrics) {
+    this.keyManager = keyManager;
+    this.bucketManager = ozoneManager.getBucketManager();
+    this.volumeManager = ozoneManager.getVolumeManager();
+    this.prefixManager = prefixManager;
+    this.metadataManager = metadataManager;
+    OzoneConfiguration configuration = ozoneManager.getConfiguration();
+    this.ozoneManager = ozoneManager;
+    this.isAclEnabled = ozoneManager.getAclsEnabled();
+    this.log = log;
+    this.audit = audit;
+    boolean allowListAllVolumes = ozoneManager.getAllowListAllVolumes();
+    metrics = omMetadataReaderMetrics;
+    if (isAclEnabled) {
+      accessAuthorizer = getACLAuthorizerInstance(configuration);
+      if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
+        OzoneNativeAuthorizer authorizer =
+            (OzoneNativeAuthorizer) accessAuthorizer;
+        isNativeAuthorizerEnabled = true;
+        authorizer.setVolumeManager(volumeManager);
+        authorizer.setBucketManager(bucketManager);
+        authorizer.setKeyManager(keyManager);
+        authorizer.setPrefixManager(prefixManager);
+        authorizer.setOzoneAdmins(ozoneManager.getOmAdminUsernames());
+        authorizer.setAllowListAllVolumes(allowListAllVolumes);
+      } else {
+        isNativeAuthorizerEnabled = false;
+      }
+    } else {
+      accessAuthorizer = null;
+      isNativeAuthorizerEnabled = false;
+    }
+  }
+
+  /**
+   * Lookup a key.
+   *
+   * @param args - attributes of the key.
+   * @return OmKeyInfo - the info about the requested key.
+   * @throws IOException
+   */
+  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
+
+    if (isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
+    }
+
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
+    try {
+      metrics.incNumKeyLookups();
+      return keyManager.lookupKey(args, getClientAddress());
+    } catch (Exception ex) {
+      metrics.incNumKeyLookupFails();
+      auditSuccess = false;
+      audit.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
+          auditMap, ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
+            auditMap));
+      }
+    }
+  }
+
+  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries, boolean allowPartialPrefixes)
+      throws IOException {
+
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
+
+    if (isAclEnabled) {
+      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
+    }
+
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
+    try {
+      metrics.incNumListStatus();
+      return keyManager.listStatus(args, recursive, startKey, numEntries,
+              getClientAddress(), allowPartialPrefixes);
+    } catch (Exception ex) {
+      metrics.incNumListStatusFails();
+      auditSuccess = false;
+      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
+          auditMap, ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(buildAuditMessageForSuccess(
+            OMAction.LIST_STATUS, auditMap));
+      }
+    }
+  }
+  
+  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
+
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
+    try {
+      metrics.incNumGetFileStatus();
+      return keyManager.getFileStatus(args, getClientAddress());
+    } catch (IOException ex) {
+      metrics.incNumGetFileStatusFails();
+      auditSuccess = false;
+      audit.logReadFailure(
+          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS, auditMap, ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(
+            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS, auditMap));
+      }
+    }
+  }
+
+  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
+
+    if (isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
+    }
+
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
+    try {
+      metrics.incNumLookupFile();
+      return keyManager.lookupFile(args, getClientAddress());
+    } catch (Exception ex) {
+      metrics.incNumLookupFileFails();
+      auditSuccess = false;
+      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
+          auditMap, ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(buildAuditMessageForSuccess(
+            OMAction.LOOKUP_FILE, auditMap));
+      }
+    }
+  }
+
+  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+      String startKey, String keyPrefix, int maxKeys) throws IOException {
+
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(
+        Pair.of(volumeName, bucketName));
+
+    if (isAclEnabled) {
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
+          bucket.realVolume(), bucket.realBucket(), keyPrefix);
+    }
+
+    boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit();
+    auditMap.put(OzoneConsts.START_KEY, startKey);
+    auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
+    auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
+
+    try {
+      metrics.incNumKeyLists();
+      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
+          startKey, keyPrefix, maxKeys);
+    } catch (IOException ex) {
+      metrics.incNumKeyListFails();
+      auditSuccess = false;
+      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_KEYS,
+          auditMap, ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.LIST_KEYS,
+            auditMap));
+      }
+    }
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    boolean auditSuccess = true;
+
+    try {
+      if (isAclEnabled) {
+        checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.READ_ACL,
+            obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
+      }
+      metrics.incNumGetAcl();
+      switch (obj.getResourceType()) {
+      case VOLUME:
+        return volumeManager.getAcl(obj);
+      case BUCKET:
+        return bucketManager.getAcl(obj);
+      case KEY:
+        return keyManager.getAcl(obj);
+      case PREFIX:
+        return prefixManager.getAcl(obj);
+
+      default:
+        throw new OMException("Unexpected resource type: " +
+            obj.getResourceType(), INVALID_REQUEST);
+      }
+    } catch (Exception ex) {
+      auditSuccess = false;
+      audit.logReadFailure(
+          buildAuditMessageForFailure(OMAction.GET_ACL, obj.toAuditMap(), ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        audit.logReadSuccess(
+            buildAuditMessageForSuccess(OMAction.GET_ACL, obj.toAuditMap()));
+      }
+    }
+  }
+
+  /**
+   * Checks if current caller has acl permissions.
+   *
+   * @param resType - Type of ozone resource. Ex volume, bucket.
+   * @param store   - Store type. i.e Ozone, S3.
+   * @param acl     - type of access to be checked.
+   * @param vol     - name of volume
+   * @param bucket  - bucket name
+   * @param key     - key
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
+   */
+  void checkAcls(ResourceType resType, StoreType store,
+      ACLType acl, String vol, String bucket, String key)
+      throws IOException {
+    UserGroupInformation user;
+    if (getS3Auth() != null) {
+      String principal =
+          OzoneAclUtils.accessIdToUserPrincipal(getS3Auth().getAccessId());
+      user = UserGroupInformation.createRemoteUser(principal);
+    } else {
+      user = ProtobufRpcEngine.Server.getRemoteUser();
+    }
+
+    InetAddress remoteIp = ProtobufRpcEngine.Server.getRemoteIp();
+    String volumeOwner = ozoneManager.getVolumeOwner(vol, acl, resType);
+    String bucketOwner = ozoneManager.getBucketOwner(vol, bucket, acl, 
resType);
+
+    OzoneAclUtils.checkAllAcls(this, resType, store, acl,
+        vol, bucket, key, volumeOwner, bucketOwner,
+        user != null ? user : getRemoteUser(),
+        remoteIp != null ? remoteIp :
+            ozoneManager.getOmRpcServerAddr().getAddress(),
+        remoteIp != null ? remoteIp.getHostName() :
+            ozoneManager.getOmRpcServerAddr().getHostName());
+  }
+
+  
+  /**
+   * CheckAcls for the ozone object.
+   *
+   * @return true if permission granted, false if permission denied.
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
+   *                     and throwOnPermissionDenied set to true.
+   */
+  @SuppressWarnings("parameternumber")
+  public boolean checkAcls(ResourceType resType, StoreType storeType,
+      ACLType aclType, String vol, String bucket, String key,
+      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+      boolean throwIfPermissionDenied, String owner)
+      throws OMException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(resType)
+        .setStoreType(storeType)
+        .setVolumeName(vol)
+        .setBucketName(bucket)
+        .setKeyName(key).build();
+    RequestContext context = RequestContext.newBuilder()
+        .setClientUgi(ugi)
+        .setIp(remoteAddress)
+        .setHost(hostName)
+        .setAclType(ACLIdentityType.USER)
+        .setAclRights(aclType)
+        .setOwnerName(owner)
+        .build();
+
+    return checkAcls(obj, context, throwIfPermissionDenied);
+  }
+
+  /**
+   * CheckAcls for the ozone object.
+   *
+   * @return true if permission granted, false if permission denied.
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
+   *                     and throwOnPermissionDenied set to true.
+   */
+  public boolean checkAcls(OzoneObj obj, RequestContext context,
+                           boolean throwIfPermissionDenied)
+      throws OMException {
+
+    if (!accessAuthorizer.checkAccess(obj, context)) {
+      if (throwIfPermissionDenied) {
+        String volumeName = obj.getVolumeName() != null ?
+                "Volume:" + obj.getVolumeName() + " " : "";
+        String bucketName = obj.getBucketName() != null ?
+                "Bucket:" + obj.getBucketName() + " " : "";
+        String keyName = obj.getKeyName() != null ?
+                "Key:" + obj.getKeyName() : "";
+        log.warn("User {} doesn't have {} permission to access {} {}{}{}",
+            context.getClientUgi().getUserName(), context.getAclRights(),
+            obj.getResourceType(), volumeName, bucketName, keyName);
+        throw new OMException("User " + context.getClientUgi().getUserName() +
+            " doesn't have " + context.getAclRights() +
+            " permission to access " + obj.getResourceType() + " " +
+            volumeName  + bucketName + keyName, ResultCodes.PERMISSION_DENIED);
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Returns an instance of {@link IAccessAuthorizer}.
+   * Looks up the configuration to see if there is custom class specified.
+   * Constructs the instance by passing the configuration directly to the
+   * constructor to achieve thread safety using final fields.
+   *
+   * @param conf
+   * @return IAccessAuthorizer
+   */
+  private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) {
+    Class<? extends IAccessAuthorizer> clazz = conf.getClass(
+        OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizer.class,
+        IAccessAuthorizer.class);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+
+  private static String getClientAddress() {
+    String clientMachine = Server.getRemoteAddress();
+    if (clientMachine == null) { //not a RPC client
+      clientMachine = "";
+    }
+    return clientMachine;
+  }
+
+  public AuditMessage buildAuditMessageForSuccess(AuditAction op,
+      Map<String, String> auditMap) {
+
+    return new AuditMessage.Builder()
+        .setUser(getRemoteUserName())
+        .atIp(Server.getRemoteAddress())
+        .forOperation(op)
+        .withParams(auditMap)
+        .withResult(AuditEventStatus.SUCCESS)
+        .build();
+  }
+
+  public AuditMessage buildAuditMessageForFailure(AuditAction op,
+      Map<String, String> auditMap, Throwable throwable) {
+
+    return new AuditMessage.Builder()
+        .setUser(getRemoteUserName())
+        .atIp(Server.getRemoteAddress())
+        .forOperation(op)
+        .withParams(auditMap)
+        .withResult(AuditEventStatus.FAILURE)
+        .withException(throwable)
+        .build();
+  }
+
+  /**
+   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
+   *
+   * @return if native authorizer is enabled.
+   */
+  public boolean isNativeAuthorizerEnabled() {
+    return isNativeAuthorizerEnabled;
+  }
+
+  public IAccessAuthorizer getAccessAuthorizer() {
+    return accessAuthorizer;
+  }
+
+  private ResourceType getResourceType(OmKeyArgs args) {
+    if (args.getKeyName() == null || args.getKeyName().length() == 0) {
+      return ResourceType.BUCKET;
+    }
+    return ResourceType.KEY;
+  }
+
+  
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReaderMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReaderMetrics.java
new file mode 100644
index 0000000000..3fd7fe383c
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReaderMetrics.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+/**
+ * Interface OM Metadata Reading metrics classes.
+ */
+public interface OmMetadataReaderMetrics {
+  void incNumKeyLookups();
+
+  void incNumKeyLookupFails();
+
+  void incNumListStatus();
+
+  void incNumListStatusFails();
+
+  void incNumGetFileStatus();
+
+  void incNumGetFileStatusFails();
+
+  void incNumLookupFile();
+
+  void incNumLookupFileFails();
+
+  void incNumKeyLists();
+
+  void incNumKeyListFails();
+
+  void incNumGetAcl();
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
index b6eaeca0ac..c0c1d6cdf5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
@@ -62,7 +62,7 @@ public final class OzoneAclUtils {
 
   /**
    * Check Acls of ozone object with volume owner and bucket owner.
-   * @param ozoneManager
+   * @param omMetadataReader
    * @param resType
    * @param storeType
    * @param aclType
@@ -74,7 +74,7 @@ public final class OzoneAclUtils {
    * @throws IOException
    */
   @SuppressWarnings("parameternumber")
-  public static void checkAllAcls(OzoneManager ozoneManager,
+  public static void checkAllAcls(OmMetadataReader omMetadataReader,
       OzoneObj.ResourceType resType,
       OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
       String vol, String bucket, String key, String volOwner,
@@ -88,7 +88,7 @@ public final class OzoneAclUtils {
     //OzoneNativeAuthorizer differs from Ranger Authorizer as Ranger requires
     // only READ access on parent level access. OzoneNativeAuthorizer has
     // different parent level access based on the child level access type
-    if (ozoneManager.isNativeAuthorizerEnabled()) {
+    if (omMetadataReader.isNativeAuthorizerEnabled()) {
       if (aclType == IAccessAuthorizer.ACLType.CREATE ||
           aclType == IAccessAuthorizer.ACLType.DELETE ||
           aclType == IAccessAuthorizer.ACLType.WRITE_ACL) {
@@ -105,7 +105,7 @@ public final class OzoneAclUtils {
     //For Volume level access we only need to check {OWNER} equal
     // to Volume Owner.
     case VOLUME:
-      ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+      omMetadataReader.checkAcls(resType, storeType, aclType, vol, bucket, key,
           user, remoteAddress, hostName, true,
           volOwner);
       break;
@@ -116,16 +116,18 @@ public final class OzoneAclUtils {
     // volume owner if current ugi user is volume owner else we need check
     //{OWNER} equals bucket owner for bucket/key/prefix.
     case PREFIX:
-      ozoneManager.checkAcls(OzoneObj.ResourceType.VOLUME, storeType,
+      omMetadataReader.checkAcls(OzoneObj.ResourceType.VOLUME, storeType,
           parentAclRight, vol, bucket, key, user,
           remoteAddress, hostName, true,
           volOwner);
       if (isVolOwner) {
-        ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+        omMetadataReader.checkAcls(resType, storeType,
+            aclType, vol, bucket, key,
             user, remoteAddress, hostName, true,
             volOwner);
       } else {
-        ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+        omMetadataReader.checkAcls(resType, storeType,
+            aclType, vol, bucket, key,
             user, remoteAddress, hostName, true,
             bucketOwner);
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 1b77ee82f8..8d185242a2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
 import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.util.OzoneNetUtils;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
@@ -106,7 +107,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.audit.AuditAction;
-import org.apache.hadoop.ozone.audit.AuditEventStatus;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMessage;
@@ -151,7 +151,6 @@ import 
org.apache.hadoop.hdds.security.OzoneSecurityException;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
-import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.om.upgrade.OMUpgradeFinalizer;
@@ -174,8 +173,6 @@ import 
org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
@@ -194,7 +191,6 @@ import 
org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.KMSUtil;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -213,14 +209,12 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
-import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.hdds.utils.HAUtils.getScmInfo;
 import static org.apache.hadoop.ozone.OmUtils.MAX_TRXN_ID;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
@@ -353,7 +347,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private final File omMetaDir;
   private boolean isAclEnabled;
   private final boolean isSpnegoEnabled;
-  private IAccessAuthorizer accessAuthorizer;
   private JvmPauseMonitor jvmPauseMonitor;
   private final SecurityConfig secConfig;
   private S3SecretManager s3SecretManager;
@@ -395,8 +388,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   private boolean isS3MultiTenancyEnabled;
 
-  private boolean isNativeAuthorizerEnabled;
-
   private ExitManager exitManager;
 
   private OzoneManagerPrepareState prepareState;
@@ -434,6 +425,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   private final boolean isSecurityEnabled;
 
+  // This metadata reader points to the active filesystem
+  private OmMetadataReader omMetadataReader;
+
   @SuppressWarnings("methodlength")
   private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
       throws IOException, AuthenticationException {
@@ -572,6 +566,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     // Get admin list
     omAdminUsernames = getOzoneAdminsFromConfig(configuration);
+
+    metrics = OMMetrics.create();
     instantiateServices(false);
 
     // Create special volume s3v which is required for S3G.
@@ -588,7 +584,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     initializeRatisDirs(conf);
     initializeRatisServer(isBootstrapping || isForcedBootstrapping);
 
-    metrics = OMMetrics.create();
     omClientProtocolMetrics = ProtocolMessageMetrics
         .create("OmClientProtocol", "Ozone Manager RPC endpoint",
             OzoneManagerProtocolProtos.Type.values());
@@ -710,6 +705,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     prefixManager = new PrefixManagerImpl(metadataManager, isRatisEnabled);
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
         omStorage.getOmId());
+    omMetadataReader = new OmMetadataReader(keyManager, prefixManager,
+        metadataManager, this, LOG, AUDIT, metrics);
 
     if (withNewSnapshot) {
       Integer layoutVersionInDB = getLayoutVersionInDB();
@@ -736,23 +733,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       // restart.
       instantiatePrepareStateOnStartup();
     }
-
-    if (isAclEnabled) {
-      accessAuthorizer = getACLAuthorizerInstance(configuration);
-      if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
-        OzoneNativeAuthorizer authorizer =
-            (OzoneNativeAuthorizer) accessAuthorizer;
-        isNativeAuthorizerEnabled = true;
-        authorizer.setVolumeManager(volumeManager);
-        authorizer.setBucketManager(bucketManager);
-        authorizer.setKeyManager(keyManager);
-        authorizer.setPrefixManager(prefixManager);
-        authorizer.setOzoneAdmins(omAdminUsernames);
-        authorizer.setAllowListAllVolumes(allowListAllVolumes);
-      }
-    } else {
-      accessAuthorizer = null;
-    }
   }
 
   /**
@@ -843,22 +823,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return cryptoProvider;
   }
 
-  /**
-   * Returns an instance of {@link IAccessAuthorizer}.
-   * Looks up the configuration to see if there is custom class specified.
-   * Constructs the instance by passing the configuration directly to the
-   * constructor to achieve thread safety using final fields.
-   *
-   * @param conf
-   * @return IAccessAuthorizer
-   */
-  private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) {
-    Class<? extends IAccessAuthorizer> clazz = conf.getClass(
-        OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizer.class,
-        IAccessAuthorizer.class);
-    return ReflectionUtils.newInstance(clazz, conf);
-  }
-
   @Override
   public void close() throws IOException {
     stop();
@@ -1415,8 +1379,16 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return prefixManager;
   }
 
+  public VolumeManager getVolumeManager() {
+    return volumeManager;
+  }
+
+  public BucketManager getBucketManager() {
+    return bucketManager;
+  }
+
   public IAccessAuthorizer getAccessAuthorizer() {
-    return accessAuthorizer;
+    return omMetadataReader.getAccessAuthorizer();
   }
 
   /**
@@ -2353,40 +2325,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
-  /**
-   * Checks if current caller has acl permissions.
-   *
-   * @param resType - Type of ozone resource. Ex volume, bucket.
-   * @param store   - Store type. i.e Ozone, S3.
-   * @param acl     - type of access to be checked.
-   * @param vol     - name of volume
-   * @param bucket  - bucket name
-   * @param key     - key
-   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
-   */
-  private void checkAcls(ResourceType resType, StoreType store,
-      ACLType acl, String vol, String bucket, String key)
-      throws IOException {
-    UserGroupInformation user;
-    if (getS3Auth() != null) {
-      String principal =
-          OzoneAclUtils.accessIdToUserPrincipal(getS3Auth().getAccessId());
-      user = UserGroupInformation.createRemoteUser(principal);
-    } else {
-      user = ProtobufRpcEngine.Server.getRemoteUser();
-    }
-
-    InetAddress remoteIp = ProtobufRpcEngine.Server.getRemoteIp();
-    String volumeOwner = getVolumeOwner(vol, acl, resType);
-    String bucketOwner = getBucketOwner(vol, bucket, acl, resType);
-
-    OzoneAclUtils.checkAllAcls(this, resType, store, acl,
-        vol, bucket, key, volumeOwner, bucketOwner,
-        user != null ? user : getRemoteUser(),
-        remoteIp != null ? remoteIp : omRpcAddress.getAddress(),
-        remoteIp != null ? remoteIp.getHostName() : 
omRpcAddress.getHostName());
-  }
-
   public boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
     if (ownerName == null) {
       return false;
@@ -2526,40 +2464,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .setOwnerName(owner)
         .build();
 
-    return checkAcls(obj, context, throwIfPermissionDenied);
-  }
-
-  /**
-   * CheckAcls for the ozone object.
-   *
-   * @return true if permission granted, false if permission denied.
-   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
-   *                     and throwOnPermissionDenied set to true.
-   */
-  public boolean checkAcls(OzoneObj obj, RequestContext context,
-                           boolean throwIfPermissionDenied)
-      throws OMException {
-
-    if (!accessAuthorizer.checkAccess(obj, context)) {
-      if (throwIfPermissionDenied) {
-        String volumeName = obj.getVolumeName() != null ?
-                "Volume:" + obj.getVolumeName() + " " : "";
-        String bucketName = obj.getBucketName() != null ?
-                "Bucket:" + obj.getBucketName() + " " : "";
-        String keyName = obj.getKeyName() != null ?
-                "Key:" + obj.getKeyName() : "";
-        LOG.warn("User {} doesn't have {} permission to access {} {}{}{}",
-            context.getClientUgi().getUserName(), context.getAclRights(),
-            obj.getResourceType(), volumeName, bucketName, keyName);
-        throw new OMException("User " + context.getClientUgi().getUserName() +
-            " doesn't have " + context.getAclRights() +
-            " permission to access " + obj.getResourceType() + " " +
-            volumeName  + bucketName + keyName, ResultCodes.PERMISSION_DENIED);
-      }
-      return false;
-    } else {
-      return true;
-    }
+    return omMetadataReader.checkAcls(obj, context, throwIfPermissionDenied);
   }
 
 
@@ -2573,6 +2478,14 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return isAclEnabled;
   }
 
+  public boolean getAllowListAllVolumes() {
+    return allowListAllVolumes;
+  }
+
+  public OmMetadataReader getOmMetadataReader() {
+    return omMetadataReader;
+  }
+
   /**
    * Return true if SPNEGO auth is enabled for OM HTTP server, otherwise false.
    *
@@ -2592,7 +2505,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   @Override
   public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.READ, volume,
+      omMetadataReader.checkAcls(ResourceType.VOLUME,
+          StoreType.OZONE, ACLType.READ, volume,
           null, null);
     }
 
@@ -2701,7 +2615,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       if (!allowListAllVolumes) {
         // Only admin can list all volumes when disallowed in config
         if (isAclEnabled) {
-          checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
+          omMetadataReader.checkAcls(ResourceType.VOLUME,
+              StoreType.OZONE, ACLType.LIST,
               OzoneConsts.OZONE_ROOT, null, null);
         }
       }
@@ -2728,7 +2643,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       String startKey, String prefix, int maxNumOfBuckets)
       throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, volumeName,
+      omMetadataReader.checkAcls(ResourceType.VOLUME,
+          StoreType.OZONE, ACLType.LIST, volumeName,
           null, null);
     }
     boolean auditSuccess = true;
@@ -2767,7 +2683,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public OmBucketInfo getBucketInfo(String volume, String bucket)
       throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ, volume,
+      omMetadataReader.checkAcls(ResourceType.BUCKET,
+          StoreType.OZONE, ACLType.READ, volume,
           bucket, null);
     }
     boolean auditSuccess = true;
@@ -2801,68 +2718,14 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
-    ResolvedBucket bucket = resolveBucketLink(args);
-
-    if (isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-    }
-
-    boolean auditSuccess = true;
-    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
-
-    args = bucket.update(args);
-
-    try {
-      metrics.incNumKeyLookups();
-      return keyManager.lookupKey(args, getClientAddress());
-    } catch (Exception ex) {
-      metrics.incNumKeyLookupFails();
-      auditSuccess = false;
-      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
-          auditMap, ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
-            auditMap));
-      }
-    }
+    return omMetadataReader.lookupKey(args);
   }
 
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
-
-    ResolvedBucket bucket = resolveBucketLink(Pair.of(volumeName, bucketName));
-
-    if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
-          bucket.realVolume(), bucket.realBucket(), keyPrefix);
-    }
-
-    boolean auditSuccess = true;
-    Map<String, String> auditMap = bucket.audit();
-    auditMap.put(OzoneConsts.START_KEY, startKey);
-    auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
-    auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
-
-    try {
-      metrics.incNumKeyLists();
-      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
-          startKey, keyPrefix, maxKeys);
-    } catch (IOException ex) {
-      metrics.incNumKeyListFails();
-      auditSuccess = false;
-      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_KEYS,
-          auditMap, ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.LIST_KEYS,
-            auditMap));
-      }
-    }
+    return omMetadataReader.listKeys(volumeName, bucketName,
+        startKey, keyPrefix, maxKeys);
   }
 
   @Override
@@ -2873,7 +2736,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     // bucket links not supported
 
     if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
+      omMetadataReader.checkAcls(ResourceType.BUCKET,
+          StoreType.OZONE, ACLType.LIST,
           volumeName, bucketName, keyPrefix);
     }
 
@@ -2915,28 +2779,14 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   @Override
   public AuditMessage buildAuditMessageForSuccess(AuditAction op,
       Map<String, String> auditMap) {
-
-    return new AuditMessage.Builder()
-        .setUser(getRemoteUserName())
-        .atIp(Server.getRemoteAddress())
-        .forOperation(op)
-        .withParams(auditMap)
-        .withResult(AuditEventStatus.SUCCESS)
-        .build();
+    return omMetadataReader.buildAuditMessageForSuccess(op, auditMap);
   }
 
   @Override
   public AuditMessage buildAuditMessageForFailure(AuditAction op,
       Map<String, String> auditMap, Throwable throwable) {
-
-    return new AuditMessage.Builder()
-        .setUser(getRemoteUserName())
-        .atIp(Server.getRemoteAddress())
-        .forOperation(op)
-        .withParams(auditMap)
-        .withResult(AuditEventStatus.FAILURE)
-        .withException(throwable)
-        .build();
+    return omMetadataReader.buildAuditMessageForFailure(op,
+        auditMap, throwable);
   }
 
   private void registerMXBean() {
@@ -2953,14 +2803,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
-  private static String getClientAddress() {
-    String clientMachine = Server.getRemoteAddress();
-    if (clientMachine == null) { //not a RPC client
-      clientMachine = "";
-    }
-    return clientMachine;
-  }
-
   @Override
   public String getRpcPort() {
     return "" + omRpcAddress.getPort();
@@ -3455,66 +3297,12 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
-    ResolvedBucket bucket = resolveBucketLink(args);
-
-    boolean auditSuccess = true;
-    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
-
-    args = bucket.update(args);
-
-    try {
-      metrics.incNumGetFileStatus();
-      return keyManager.getFileStatus(args, getClientAddress());
-    } catch (IOException ex) {
-      metrics.incNumGetFileStatusFails();
-      auditSuccess = false;
-      AUDIT.logReadFailure(
-          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS, auditMap, ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(
-            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS, auditMap));
-      }
-    }
-  }
-
-  private ResourceType getResourceType(OmKeyArgs args) {
-    if (args.getKeyName() == null || args.getKeyName().length() == 0) {
-      return ResourceType.BUCKET;
-    }
-    return ResourceType.KEY;
+    return omMetadataReader.getFileStatus(args);
   }
 
   @Override
   public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
-    ResolvedBucket bucket = resolveBucketLink(args);
-
-    if (isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-    }
-
-    boolean auditSuccess = true;
-    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
-
-    args = bucket.update(args);
-
-    try {
-      metrics.incNumLookupFile();
-      return keyManager.lookupFile(args, getClientAddress());
-    } catch (Exception ex) {
-      metrics.incNumLookupFileFails();
-      auditSuccess = false;
-      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
-          auditMap, ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(buildAuditMessageForSuccess(
-            OMAction.LOOKUP_FILE, auditMap));
-      }
-    }
+    return omMetadataReader.lookupFile(args);
   }
 
   @Override
@@ -3528,34 +3316,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       String startKey, long numEntries, boolean allowPartialPrefixes)
       throws IOException {
 
-    ResolvedBucket bucket = resolveBucketLink(args);
-
-    if (isAclEnabled) {
-      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
-          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-    }
-
-    boolean auditSuccess = true;
-    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
-
-    args = bucket.update(args);
-
-    try {
-      metrics.incNumListStatus();
-      return keyManager.listStatus(args, recursive, startKey, numEntries,
-              getClientAddress(), allowPartialPrefixes);
-    } catch (Exception ex) {
-      metrics.incNumListStatusFails();
-      auditSuccess = false;
-      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
-          auditMap, ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(buildAuditMessageForSuccess(
-            OMAction.LIST_STATUS, auditMap));
-      }
-    }
+    return omMetadataReader.listStatus(args, recursive,
+        startKey, numEntries, allowPartialPrefixes);
   }
 
   /**
@@ -3566,39 +3328,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
-    boolean auditSuccess = true;
-
-    try {
-      if (isAclEnabled) {
-        checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.READ_ACL,
-            obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
-      }
-      metrics.incNumGetAcl();
-      switch (obj.getResourceType()) {
-      case VOLUME:
-        return volumeManager.getAcl(obj);
-      case BUCKET:
-        return bucketManager.getAcl(obj);
-      case KEY:
-        return keyManager.getAcl(obj);
-      case PREFIX:
-        return prefixManager.getAcl(obj);
-
-      default:
-        throw new OMException("Unexpected resource type: " +
-            obj.getResourceType(), INVALID_REQUEST);
-      }
-    } catch (Exception ex) {
-      auditSuccess = false;
-      AUDIT.logReadFailure(
-          buildAuditMessageForFailure(OMAction.GET_ACL, obj.toAuditMap(), ex));
-      throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logReadSuccess(
-            buildAuditMessageForSuccess(OMAction.GET_ACL, obj.toAuditMap()));
-      }
-    }
+    return omMetadataReader.getAcl(obj);
   }
 
   /**
@@ -4097,15 +3827,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
-  /**
-   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
-   *
-   * @return if native authorizer is enabled.
-   */
-  public boolean isNativeAuthorizerEnabled() {
-    return isNativeAuthorizerEnabled;
-  }
-
   @VisibleForTesting
   public boolean isRunning() {
     return omState == State.RUNNING;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
index 367bbf3ebd..8c0b9150c3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
@@ -89,110 +89,6 @@ public class PrefixManagerImpl implements PrefixManager {
     return metadataManager;
   }
 
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   *
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl to be added.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    validateOzoneObj(obj);
-
-    String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
-    try {
-      OmPrefixInfo prefixInfo =
-          metadataManager.getPrefixTable().get(prefixPath);
-
-      OMPrefixAclOpResult omPrefixAclOpResult = addAcl(obj, acl, prefixInfo,
-          0L);
-
-      return omPrefixAclOpResult.isSuccess();
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Add acl operation failed for prefix path:{} acl:{}",
-            prefixPath, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
-    }
-  }
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   *
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    validateOzoneObj(obj);
-    String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
-    try {
-      OmPrefixInfo prefixInfo =
-          metadataManager.getPrefixTable().get(prefixPath);
-      OMPrefixAclOpResult omPrefixAclOpResult = removeAcl(obj, acl, 
prefixInfo);
-
-      if (!omPrefixAclOpResult.isSuccess()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("acl {} does not exist for prefix path {} ",
-              acl, prefixPath);
-        }
-        return false;
-      }
-
-      return omPrefixAclOpResult.isSuccess();
-
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Remove prefix acl operation failed for prefix path:{}" +
-            " acl:{}", prefixPath, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
-    }
-  }
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for given
-   * object to list of ACLs provided in argument.
-   *
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
-    validateOzoneObj(obj);
-    String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
-    try {
-      OmPrefixInfo prefixInfo =
-          metadataManager.getPrefixTable().get(prefixPath);
-
-      OMPrefixAclOpResult omPrefixAclOpResult = setAcl(obj, acls, prefixInfo,
-          0L);
-
-      return omPrefixAclOpResult.isSuccess();
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Set prefix acl operation failed for prefix path:{} acls:{}",
-            prefixPath, acls, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
-    }
-  }
-
   /**
    * Returns list of ACLs for given Ozone object.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index 7041d7b969..11e261f41c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -100,143 +100,6 @@ public class VolumeManagerImpl implements VolumeManager {
     }
   }
 
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   *
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl to be added.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acl);
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "VolumeManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
-    try {
-      String dbVolumeKey = metadataManager.getVolumeKey(volume);
-      OmVolumeArgs volumeArgs =
-          metadataManager.getVolumeTable().get(dbVolumeKey);
-      if (volumeArgs == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new OMException("Volume " + volume + " is not found",
-            ResultCodes.VOLUME_NOT_FOUND);
-      }
-      if (volumeArgs.addAcl(acl)) {
-        metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
-        return true;
-      }
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Add acl operation failed for volume:{} acl:{}",
-            volume, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
-    }
-
-    return false;
-  }
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   *
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acl);
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "VolumeManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
-    try {
-      String dbVolumeKey = metadataManager.getVolumeKey(volume);
-      OmVolumeArgs volumeArgs =
-          metadataManager.getVolumeTable().get(dbVolumeKey);
-      if (volumeArgs == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new OMException("Volume " + volume + " is not found",
-            ResultCodes.VOLUME_NOT_FOUND);
-      }
-      if (volumeArgs.removeAcl(acl)) {
-        metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
-        return true;
-      }
-
-      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
-      //return volumeArgs.getAclMap().hasAccess(userAcl);
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Remove acl operation failed for volume:{} acl:{}",
-            volume, acl, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
-    }
-
-    return false;
-  }
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for given
-   * object to list of ACLs provided in argument.
-   *
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   * @throws IOException if there is error.
-   */
-  @Override
-  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
-    Objects.requireNonNull(obj);
-    Objects.requireNonNull(acls);
-
-    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
-      throw new IllegalArgumentException("Unexpected argument passed to " +
-          "VolumeManager. OzoneObj type:" + obj.getResourceType());
-    }
-    String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
-    try {
-      String dbVolumeKey = metadataManager.getVolumeKey(volume);
-      OmVolumeArgs volumeArgs =
-          metadataManager.getVolumeTable().get(dbVolumeKey);
-      if (volumeArgs == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new OMException("Volume " + volume + " is not found",
-            ResultCodes.VOLUME_NOT_FOUND);
-      }
-      volumeArgs.setAcls(acls);
-      metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
-
-      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
-      //return volumeArgs.getAclMap().hasAccess(userAcl);
-    } catch (IOException ex) {
-      if (!(ex instanceof OMException)) {
-        LOG.error("Set acl operation failed for volume:{} acls:{}",
-            volume, acls, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
-    }
-
-    return true;
-  }
-
   /**
    * Returns list of ACLs for given Ozone object.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 45b8303bd8..22cfc8037d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -251,9 +251,11 @@ public abstract class OMClientRequest implements 
RequestAuditor {
 
     // check Acl
     if (ozoneManager.getAclsEnabled()) {
-      String volumeOwner = ozoneManager.getVolumeOwner(obj.getVolumeName(),
+      String volumeOwner = ozoneManager.getVolumeOwner(
+          obj.getVolumeName(),
           contextBuilder.getAclRights(), obj.getResourceType());
-      String bucketOwner = ozoneManager.getBucketOwner(obj.getVolumeName(),
+      String bucketOwner = ozoneManager.getBucketOwner(
+          obj.getVolumeName(),
           obj.getBucketName(), contextBuilder.getAclRights(),
           obj.getResourceType());
       UserGroupInformation currentUser = createUGI();
@@ -269,7 +271,7 @@ public abstract class OMClientRequest implements 
RequestAuditor {
       } else {
         contextBuilder.setOwnerName(bucketOwner);
       }
-      if (ozoneManager.isNativeAuthorizerEnabled()) {
+      if (ozoneManager.getOmMetadataReader().isNativeAuthorizerEnabled()) {
         if (aclType == IAccessAuthorizer.ACLType.CREATE ||
                 aclType == IAccessAuthorizer.ACLType.DELETE ||
                 aclType == IAccessAuthorizer.ACLType.WRITE_ACL) {
@@ -296,8 +298,10 @@ public abstract class OMClientRequest implements 
RequestAuditor {
               .setAclRights(parentAclRight)
               .setOwnerName(volumeOwner)
               .build();
-      ozoneManager.checkAcls(volumeObj, volumeContext, true);
-      ozoneManager.checkAcls(obj, contextBuilder.build(), true);
+      ozoneManager.getOmMetadataReader().checkAcls(volumeObj,
+          volumeContext, true);
+      ozoneManager.getOmMetadataReader().checkAcls(obj,
+          contextBuilder.build(), true);
     }
   }
 
@@ -357,7 +361,8 @@ public abstract class OMClientRequest implements 
RequestAuditor {
       String bucketOwner)
       throws IOException {
 
-    OzoneAclUtils.checkAllAcls(ozoneManager, resType, storeType, aclType,
+    OzoneAclUtils.checkAllAcls(ozoneManager.getOmMetadataReader(),
+            resType, storeType, aclType,
             vol, bucket, key, volOwner, bucketOwner, createUGI(),
             getRemoteAddress(), getHostName());
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 79e0d51443..8a66640160 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -380,7 +380,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
     // Native authorizer requires client id as part of key name to check
     // write ACL on key. Add client id to key name if ozone native
     // authorizer is configured.
-    if (ozoneManager.isNativeAuthorizerEnabled()) {
+    if (ozoneManager.getOmMetadataReader().isNativeAuthorizerEnabled()) {
       keyNameForAclCheck = key + "/" + clientId;
     }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index c2232931d7..b0f794a3f3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataReader;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmClient;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
@@ -156,6 +157,9 @@ public class TestOMKeyRequest {
     when(scmClient.getBlockClient()).thenReturn(scmBlockLocationProtocol);
     when(ozoneManager.getKeyManager()).thenReturn(keyManager);
 
+    OmMetadataReader omMetadataReader = Mockito.mock(OmMetadataReader.class);
+    when(ozoneManager.getOmMetadataReader()).thenReturn(omMetadataReader);
+
     prepareState = new OzoneManagerPrepareState(ozoneConfiguration);
     when(ozoneManager.getPrepareState()).thenReturn(prepareState);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to