This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit d8765436c2bf76547973cc568f1648f1b1fe9fcb
Merge: 0bcd697107 ab91e46247
Author: Siyao Meng <[email protected]>
AuthorDate: Mon Jan 9 22:10:25 2023 -0800

    Merge remote-tracking branch 'asf/master' into HDDS-6517-Snapshot
    
    Conflicts:
    
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
    hadoop-hdds/common/src/main/resources/ozone-default.xml
    hadoop-hdds/framework/pom.xml
    hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
    hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
    hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
    
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
    
    Additional modifications:
    
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
    hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
    
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
    
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
    
    Change-Id: I3169831b3a9bc156478309a8b2085156243d2b8c

 .github/workflows/{post-commit.yml => ci.yml}      |   86 +-
 .github/workflows/close-pending.yaml               |    2 +-
 .github/workflows/comments.yaml                    |    2 +-
 .github/workflows/post-commit.yml                  |  405 +--
 .../{close-pending.yaml => scheduled_ci.yml}       |   20 +-
 LICENSE.txt                                        |   10 +-
 NOTICE.txt                                         |    2 +-
 README.md                                          |    2 +-
 dev-support/ci/lib/_initialization.sh              |    4 +-
 hadoop-hdds/annotations/pom.xml                    |    4 +-
 hadoop-hdds/client/pom.xml                         |    4 +-
 .../hadoop/hdds/scm/ECXceiverClientGrpc.java       |   56 +-
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |   74 +
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |   21 +-
 .../hadoop/hdds/scm/XceiverClientMetrics.java      |   10 +
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |    7 +-
 .../hdds/scm/storage/AbstractDataStreamOutput.java |  131 +
 ...utputStream.java => BlockDataStreamOutput.java} |  691 +++--
 .../hdds/scm/storage/BlockExtendedInputStream.java |   11 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   50 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   12 +-
 .../hdds/scm/storage/ByteBufferStreamOutput.java   |   57 +
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   19 +-
 .../hdds/scm/storage/ECBlockOutputStream.java      |   98 +
 .../hdds/scm/storage/MultipartInputStream.java     |  246 ++
 ...tendedInputStream.java => PartInputStream.java} |   22 +-
 .../hdds/scm/storage/RatisBlockOutputStream.java   |   16 +-
 .../hadoop/hdds/scm/storage/StreamBuffer.java}     |   48 +-
 .../hdds/scm/storage/StreamCommitWatcher.java      |  208 ++
 .../hadoop/ozone/client/io/ECBlockInputStream.java |    5 -
 .../ozone/client/io/ECBlockInputStreamProxy.java   |    9 +-
 .../client/io/ECBlockReconstructedInputStream.java |    2 +-
 .../io/ECBlockReconstructedStripeInputStream.java  |   13 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |  133 +-
 .../hadoop/ozone/client/io/ECStreamTestUtil.java   |    5 -
 .../ozone/client/io/TestECBlockInputStream.java    |    5 -
 .../TestECBlockReconstructedStripeInputStream.java |   20 +
 hadoop-hdds/common/pom.xml                         |    6 +-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   45 +
 .../hadoop/hdds/client/ReplicationConfig.java      |   11 +
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   18 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |   92 +-
 .../apache/hadoop/hdds/recon/ReconConfigKeys.java  |    3 +
 .../apache/hadoop/hdds/scm/PlacementPolicy.java    |   28 +-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   20 +-
 .../scm/container/common/helpers/ExcludeList.java  |    4 +-
 .../protocol/StorageContainerLocationProtocol.java |    7 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   14 +-
 .../hadoop/hdds/security/x509/SecurityConfig.java  |  140 +-
 .../x509/certificate/utils/CertificateCodec.java   |    5 +
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   52 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |    3 +
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |    2 +
 .../org/apache/hadoop/ozone/audit/DNAction.java    |    3 +-
 .../org/apache/hadoop/ozone/common/Checksum.java   |    5 +
 .../apache/hadoop/ozone/common/StorageInfo.java    |    4 +
 .../ozone/container/common/helpers/ChunkInfo.java  |   14 +
 .../helpers/ContainerCommandRequestPBHelper.java   |    1 +
 .../java/org/apache/hadoop/util/CacheMetrics.java  |  103 +
 .../org/apache/hadoop/util/CheckedRunnable.java    |   25 +-
 .../org/apache/hadoop/util/CheckedSupplier.java    |   29 +
 .../java/org/apache/hadoop/util/MetricUtil.java    |   52 +
 .../common/src/main/resources/ozone-default.xml    |  192 +-
 .../hadoop/ozone/audit/AuditLogTestUtils.java      |   79 +
 .../ozone/container/ContainerTestHelper.java       |   30 +-
 hadoop-hdds/config/pom.xml                         |    4 +-
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |    3 +-
 hadoop-hdds/container-service/pom.xml              |   17 +-
 .../hdds/datanode/metadata/CRLDBDefinition.java    |    5 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  141 +-
 .../container/common/helpers/ContainerUtils.java   |   29 +
 .../common/impl/ContainerLayoutVersion.java        |   26 +-
 .../ozone/container/common/impl/ContainerSet.java  |    3 +-
 .../container/common/impl/HddsDispatcher.java      |   66 +-
 .../common/interfaces/ContainerDispatcher.java     |   10 +
 .../ozone/container/common/interfaces/Handler.java |   16 +
 .../container/common/report/ReportPublisher.java   |    6 +-
 .../common/statemachine/DatanodeConfiguration.java |   38 +-
 .../common/statemachine/DatanodeQueueMetrics.java  |  179 ++
 .../common/statemachine/DatanodeStateMachine.java  |   30 +-
 .../common/statemachine/EndpointStateMachine.java  |    1 +
 .../common/statemachine/StateContext.java          |   59 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |   19 +-
 .../DeleteContainerCommandHandler.java             |   67 +-
 .../ReconstructECContainersCommandHandler.java     |    6 +-
 .../ReplicateContainerCommandHandler.java          |    3 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |    7 +
 .../common/transport/server/XceiverServerGrpc.java |    4 +-
 .../server/ratis/ContainerStateMachine.java        |   79 +-
 .../common/transport/server/ratis/LocalStream.java |   58 +
 .../transport/server/ratis/XceiverServerRatis.java |   43 +-
 .../container/common/utils/DatanodeStoreCache.java |   29 +-
 .../common/utils/db/DatanodeDBProfile.java         |   16 +-
 .../reconstruction/ECContainerOperationClient.java |   40 +-
 .../ECReconstructionCommandInfo.java               |   88 +-
 .../ECReconstructionCoordinator.java               |   77 +-
 .../ECReconstructionCoordinatorTask.java           |   66 +-
 .../ec/reconstruction/ECReconstructionMetrics.java |   80 +
 .../reconstruction/ECReconstructionSupervisor.java |   11 +-
 .../container/keyvalue/KeyValueContainer.java      |    9 +-
 .../container/keyvalue/KeyValueContainerCheck.java |  108 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  104 +
 .../container/keyvalue/TarContainerPacker.java     |   34 +-
 .../container/keyvalue/helpers/BlockUtils.java     |    2 +-
 .../keyvalue/impl/ChunkManagerDispatcher.java      |   16 +
 .../keyvalue/impl/FilePerBlockStrategy.java        |   20 +
 .../keyvalue/impl/KeyValueStreamDataChannel.java   |  277 ++
 .../keyvalue/impl/StreamDataChannelBase.java       |   96 +
 .../keyvalue/interfaces/ChunkManager.java          |   13 +
 .../background/BlockDeletingService.java           |   82 +-
 .../StaleRecoveringContainerScrubbingService.java  |    5 +-
 .../metadata/DatanodeSchemaThreeDBDefinition.java  |    4 +
 .../ozoneimpl/ContainerScannerConfiguration.java   |   29 +-
 .../ozoneimpl/OnDemandContainerScanner.java        |  178 ++
 .../ozoneimpl/OnDemandScannerMetrics.java          |   42 +
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   57 +-
 .../replication/ContainerReplicationSource.java    |    5 +-
 .../replication/CopyContainerCompression.java      |   78 +
 .../replication/GrpcReplicationClient.java         |   20 +-
 .../replication/GrpcReplicationService.java        |    8 +-
 .../OnDemandContainerReplicationSource.java        |   19 +-
 .../container/replication/ReplicationServer.java   |   26 +-
 .../replication/ReplicationSupervisor.java         |   62 +-
 .../replication/ReplicationSupervisorMetrics.java  |    5 +-
 .../container/replication/ReplicationTask.java     |   34 +-
 .../replication/SimpleContainerDownloader.java     |   12 +-
 .../protocol/commands/CloseContainerCommand.java   |   10 +
 .../protocol/commands/DeleteContainerCommand.java  |   16 +
 .../commands/ReconstructECContainersCommand.java   |   17 +
 .../commands/ReplicateContainerCommand.java        |   13 +
 .../hadoop/ozone/protocol/commands/SCMCommand.java |   35 +
 .../metadata/TestDatanodeCRLStoreImpl.java         |   24 +-
 .../hadoop/ozone/TestHddsDatanodeService.java      |   16 +-
 .../hadoop/ozone/TestHddsSecureDatanodeInit.java   |  315 ++-
 .../ozone/container/common/ContainerTestUtils.java |   39 +-
 .../container/common/TestBlockDeletingService.java |  200 +-
 .../common/TestContainerLayoutVersion.java         |    4 +-
 .../common/TestDatanodeLayOutVersion.java          |   10 +-
 .../container/common/TestDatanodeStateMachine.java |   60 +-
 .../container/common/TestDatanodeStoreCache.java   |    2 +-
 .../TestSchemaOneBackwardsCompatibility.java       |    2 +-
 ...stStaleRecoveringContainerScrubbingService.java |   38 +-
 .../common/helpers/TestContainerUtils.java         |    4 +-
 .../container/common/impl/TestHddsDispatcher.java  |   68 +
 .../container/common/report/TestReportManager.java |    2 +-
 .../common/report/TestReportPublisher.java         |   39 +-
 .../statemachine/TestDatanodeConfiguration.java    |    4 +-
 .../common/statemachine/TestStateContext.java      |  103 +-
 .../TestDeleteContainerCommandHandler.java         |  135 +
 .../states/datanode/TestRunningDatanodeState.java  |    8 +-
 .../states/endpoint/TestHeartbeatEndpointTask.java |  103 +-
 .../volume/TestCapacityVolumeChoosingPolicy.java   |   31 +-
 .../container/common/volume/TestHddsVolume.java    |   10 +-
 .../volume/TestRoundRobinVolumeChoosingPolicy.java |   35 +-
 .../TestVolumeIOStatsWithPrometheusSink.java       |    2 +-
 .../TestECReconstructionSupervisor.java            |  103 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  192 +-
 .../keyvalue/TestKeyValueContainerCheck.java       |    4 +-
 .../TestKeyValueContainerMarkUnhealthy.java        |    2 +
 .../container/keyvalue/TestKeyValueHandler.java    |   10 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   15 +-
 .../container/keyvalue/TestTarContainerPacker.java |   37 +-
 .../container/keyvalue/helpers/TestChunkUtils.java |   20 +-
 .../keyvalue/impl/AbstractTestChunkManager.java    |   25 +-
 .../keyvalue/impl/CommonChunkManagerTestCases.java |   10 +-
 .../keyvalue/impl/TestChunkManagerDummyImpl.java   |    4 +-
 .../keyvalue/impl/TestFilePerBlockStrategy.java    |    6 +-
 .../keyvalue/impl/TestFilePerChunkStrategy.java    |    8 +-
 .../impl/TestKeyValueStreamDataChannel.java        |  313 +++
 .../TestContainerScannerConfiguration.java         |   22 +-
 .../ozoneimpl/TestContainerScannerMetrics.java     |   95 +-
 .../ozoneimpl/TestOnDemandContainerScanner.java    |  157 ++
 .../ReplicationSupervisorScheduling.java           |   18 +-
 .../replication/TestGrpcOutputStream.java          |   16 +-
 .../replication/TestMeasuredReplicator.java        |   36 +-
 .../replication/TestReplicationConfig.java         |    4 +-
 .../replication/TestReplicationSupervisor.java     |  111 +-
 .../replication/TestSimpleContainerDownloader.java |   19 +-
 .../stream/TestDirstreamClientHandler.java         |   34 +-
 .../container/stream/TestStreamingServer.java      |   18 +-
 .../upgrade/TestDatanodeUpgradeToSchemaV3.java     |    4 +
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |    2 +-
 .../TestReconstructionECContainersCommands.java    |   27 +-
 .../dev-support/checkstyle/suppressions.xml        |    1 +
 hadoop-hdds/docs/content/concept/Recon.md          |    1 +
 .../docs/content/feature/Nonrolling-Upgrade.md     |    6 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |    4 +-
 hadoop-hdds/docs/content/feature/OM-HA.zh.md       |    4 +-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |    7 +
 hadoop-hdds/docs/content/feature/SCM-HA.zh.md      |   73 +-
 .../content/feature/Streaming-Write-Pipeline.md    |  136 +
 hadoop-hdds/docs/content/interface/Ofs.md          |   32 +-
 hadoop-hdds/docs/content/interface/S3.zh.md        |    2 +-
 hadoop-hdds/docs/content/security/SecuringTDE.md   |    2 +-
 hadoop-hdds/docs/content/security/SecurityAcls.md  |    9 +-
 .../docs/content/security/SecurityAcls.zh.md       |    7 +-
 .../docs/content/security/SecurityWithRanger.md    |    2 +-
 .../docs/content/security/SecurityWithRanger.zh.md |    2 +-
 hadoop-hdds/docs/pom.xml                           |    4 +-
 .../themes/ozonedoc/layouts/partials/header.html   |   22 +
 hadoop-hdds/erasurecode/pom.xml                    |    4 +-
 .../framework/dev-support/findbugsExcludeFile.xml  |    4 -
 hadoop-hdds/framework/pom.xml                      |   17 +-
 .../hdds/conf/DatanodeRatisServerConfig.java       |   35 +
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |    4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   20 +
 .../hadoop/hdds/security/ssl/KeyStoresFactory.java |   69 +
 .../hdds/security/ssl/MonitoringTimerTask.java     |   77 +
 .../security/ssl/PemFileBasedKeyStoresFactory.java |  199 ++
 .../hdds/security/ssl/ReloadingX509KeyManager.java |  178 ++
 .../security/ssl/ReloadingX509TrustManager.java    |  152 ++
 .../hadoop/hdds/security/ssl/package-info.java     |   21 +-
 .../certificate/authority/DefaultCAServer.java     |  116 +-
 .../x509/certificate/client/CertificateClient.java |   55 +-
 .../client/CommonCertificateClient.java            |   16 +-
 .../certificate/client/DNCertificateClient.java    |   97 +-
 .../client/DefaultCertificateClient.java           |  650 ++++-
 .../certificate/client/OMCertificateClient.java    |   57 -
 .../certificate/client/ReconCertificateClient.java |  104 +-
 .../certificate/client/SCMCertificateClient.java   |   24 +-
 .../certificates/utils/SelfSignedCertificate.java  |   38 +-
 .../x509/exceptions/CertificateException.java      |    8 +-
 .../hadoop/hdds/security/x509/keys/KeyCodec.java   |   16 +-
 .../hdds/security/x509/keys/SecurityUtil.java      |   34 +
 .../hadoop/hdds/server/events/EventExecutor.java   |   21 +
 .../FixedThreadPoolWithAffinityExecutor.java       |  219 +-
 .../hadoop/hdds/server/events/IEventInfo.java      |   13 +-
 .../hdds/server/events/SingleThreadExecutor.java   |    6 +-
 .../hadoop/hdds/server/http/BaseHttpServer.java    |    2 +-
 .../hdds/server/http/PrometheusMetricsSink.java    |  109 +-
 .../hdds/server/http/RatisDropwizardExports.java   |   86 +-
 .../hadoop/hdds/utils/DecayRpcSchedulerUtil.java   |  120 +
 .../org/apache/hadoop/hdds/utils/MetricsUtil.java  |  100 +
 .../hdds/utils/PrometheusMetricsSinkUtil.java      |  116 +
 .../hadoop/hdds/utils/TableCacheMetrics.java       |   92 +
 .../apache/hadoop/hdds/utils/UgiMetricsUtil.java   |   68 +
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |    6 +-
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  221 +-
 .../org/apache/hadoop/hdds/utils/db/Table.java     |   12 +-
 .../apache/hadoop/hdds/utils/db/TableConfig.java   |    4 +-
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |    8 +-
 .../hadoop/hdds/utils/db/cache/CacheStats.java     |   30 +-
 .../hdds/utils/db/cache/CacheStatsRecorder.java    |   47 +-
 .../hadoop/hdds/utils/db/cache/FullTableCache.java |   16 +-
 .../hdds/utils/db/cache/PartialTableCache.java     |   13 +-
 .../hadoop/hdds/utils/db/cache/TableCache.java     |    6 +
 .../db/managed/ManagedColumnFamilyOptions.java     |   13 +
 .../hdds/utils/db/managed/ManagedObject.java       |   22 +-
 .../utils/db/managed/ManagedRocksObjectUtils.java  |   23 +-
 .../ssl/TestPemFileBasedKeyStoresFactory.java      |  255 ++
 .../security/ssl/TestReloadingX509KeyManager.java  |   83 +
 .../ssl/TestReloadingX509TrustManager.java         |   83 +
 .../token/TestOzoneBlockTokenSecretManager.java    |    4 +-
 .../hdds/security/x509/CertificateClientTest.java  |  104 +-
 .../certificate/authority/TestDefaultCAServer.java |  101 +-
 .../client/TestDefaultCertificateClient.java       |  316 ++-
 ...tInit.java => TestDnCertificateClientInit.java} |   69 +-
 .../x509/certificate/utils/TestCRLCodec.java       |    9 +-
 .../certificate/utils/TestCertificateCodec.java    |   27 +-
 .../x509/certificates/TestRootCertificate.java     |   24 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  |   24 +-
 ....java => TestPrometheusMetricsIntegration.java} |  148 +-
 .../hdds/utils/TestDecayRpcSchedulerUtil.java      |  108 +
 .../hdds/utils/TestPrometheusMetricsSinkUtil.java  |  229 ++
 .../hadoop/hdds/utils/TestUgiMetricsUtil.java      |   63 +
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java |   31 +
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |   44 +-
 hadoop-hdds/hadoop-dependency-client/pom.xml       |    8 +-
 hadoop-hdds/hadoop-dependency-server/pom.xml       |    8 +-
 hadoop-hdds/hadoop-dependency-test/pom.xml         |    4 +-
 hadoop-hdds/interface-admin/pom.xml                |    4 +-
 .../src/main/proto/ScmAdminProtocol.proto          |    1 +
 .../interface-admin/src/main/resources/proto.lock  |  679 +++--
 hadoop-hdds/interface-client/pom.xml               |    4 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |   15 +-
 .../interface-client/src/main/resources/proto.lock |  991 ++++++--
 hadoop-hdds/interface-server/pom.xml               |    4 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |    4 +
 .../interface-server/src/main/resources/proto.lock |  840 ++++--
 hadoop-hdds/pom.xml                                |    4 +-
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |    6 +-
 hadoop-hdds/server-scm/pom.xml                     |   14 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |  205 +-
 .../java/org/apache/hadoop/hdds/scm/ScmUtils.java  |   34 +
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |   14 +-
 .../scm/block/DeletedBlockLogStateManager.java     |    3 +-
 .../scm/block/DeletedBlockLogStateManagerImpl.java |    2 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |    2 +-
 .../scm/container/CloseContainerEventHandler.java  |    5 +-
 .../hdds/scm/container/ContainerManager.java       |   32 +-
 .../hdds/scm/container/ContainerManagerImpl.java   |   48 +-
 .../hdds/scm/container/ContainerReportHandler.java |    7 +-
 .../scm/container/ContainerStateManagerImpl.java   |    6 +-
 .../IncrementalContainerReportHandler.java         |    5 +-
 .../balancer/AbstractFindTargetGreedy.java         |   54 +-
 .../scm/container/balancer/ContainerBalancer.java  | 1170 +--------
 .../ContainerBalancerSelectionCriteria.java        |   22 +-
 ...nerBalancer.java => ContainerBalancerTask.java} |  712 ++----
 .../scm/container/balancer/FindSourceGreedy.java   |   55 +-
 .../scm/container/balancer/FindSourceStrategy.java |   11 +
 .../FindTargetGreedyByNetworkTopology.java         |   21 +
 .../balancer/FindTargetGreedyByUsageInfo.java      |   23 +
 .../scm/container/balancer/FindTargetStrategy.java |    9 +
 .../ContainerPlacementStatusDefault.java           |   36 +-
 .../algorithms/SCMContainerPlacementRackAware.java |    8 +
 .../SCMContainerPlacementRackScatter.java          |  138 +-
 .../algorithms/SCMContainerPlacementRandom.java    |    3 +-
 .../AbstractOverReplicationHandler.java            |    4 +-
 .../replication/ContainerCheckRequest.java         |    6 +-
 .../replication/ContainerHealthResult.java         |   72 +-
 .../replication/ContainerReplicaCount.java         |    3 +
 .../replication/ContainerReplicaPendingOps.java    |  105 +-
 .../ContainerReplicaPendingOpsSubscriber.java      |   33 +-
 .../DatanodeCommandCountUpdatedHandler.java}       |   39 +-
 .../replication/ECContainerReplicaCount.java       |   83 +-
 .../replication/ECMisReplicationHandler.java       |   71 +
 .../replication/ECOverReplicationHandler.java      |   43 +-
 .../replication/ECUnderReplicationHandler.java     |  307 ++-
 .../replication/LegacyReplicationManager.java      |   74 +-
 .../replication/MisReplicationHandler.java         |  190 ++
 .../replication/OverReplicatedProcessor.java       |  124 +-
 .../RatisContainerReplicaCount.java                |   99 +-
 .../replication/RatisMisReplicationHandler.java    |   78 +
 .../replication/RatisOverReplicationHandler.java   |  247 ++
 .../replication/RatisUnderReplicationHandler.java  |  242 ++
 .../container/replication/ReplicationManager.java  |  248 +-
 .../replication/ReplicationManagerMetrics.java     |  107 +-
 .../replication/UnderReplicatedProcessor.java      |  135 +-
 .../replication/UnhealthyReplicationProcessor.java |  151 ++
 .../ClosedWithMismatchedReplicasHandler.java       |   64 +-
 .../health/ClosedWithUnhealthyReplicasHandler.java |  129 +
 .../health/ClosingContainerHandler.java            |   12 +-
 .../health/DeletingContainerHandler.java           |   99 +
 .../health/ECReplicationCheckHandler.java          |   99 +-
 .../replication/health/EmptyContainerHandler.java  |  126 +
 .../replication/health/OpenContainerHandler.java   |   11 +-
 .../health/QuasiClosedContainerHandler.java        |    4 +
 .../health/RatisReplicationCheckHandler.java       |  219 ++
 .../container/report/ContainerReportValidator.java |  104 +
 .../hdds/scm/container/report}/package-info.java   |   12 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   13 +-
 .../hadoop/hdds/scm/ha/BackgroundSCMService.java   |    2 +-
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |    7 +-
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |    5 +-
 .../hdds/scm/metadata/SCMMetadataStoreMetrics.java |   11 -
 .../apache/hadoop/hdds/scm/node/CommandQueue.java  |   66 +-
 .../hdds/scm/node/CommandQueueReportHandler.java   |    3 +-
 .../hadoop/hdds/scm/node/DatanodeAdminMonitor.java |    2 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   67 +-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |   29 +-
 .../hdds/scm/node/NodeDecommissionManager.java     |    8 +-
 .../hdds/scm/node/NodeDecommissionMetrics.java     |  290 +++
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   19 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   22 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   42 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   37 +-
 .../pipeline/PipelinePlacementPolicyFactory.java   |   52 +
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |    6 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   34 +-
 .../scm/pipeline/WritableECContainerProvider.java  |   11 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |   17 +
 .../hdds/scm/safemode/SCMSafeModeManager.java      |    4 -
 .../hdds/scm/server/ContainerReportQueue.java      |  378 +++
 .../hdds/scm/server/SCMClientProtocolServer.java   |   18 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   86 +-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   41 +-
 .../apache/hadoop/hdds/scm/server/SCMMXBean.java   |    6 -
 .../hdds/scm/server/StorageContainerManager.java   |  273 +-
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  102 +-
 .../hdds/scm/TestSCMCommonPlacementPolicy.java     |  431 +++-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |    6 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   19 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |   16 +-
 .../hdds/scm/container/SimpleMockNodeManager.java  |   15 +-
 .../scm/container/TestContainerManagerImpl.java    |   12 +-
 .../scm/container/TestContainerReportHandler.java  |   59 +
 .../TestIncrementalContainerReportHandler.java     |  124 +-
 .../container/balancer/TestContainerBalancer.java  |  995 +-------
 ...alancer.java => TestContainerBalancerTask.java} |  228 +-
 .../container/balancer/TestFindTargetStrategy.java |   34 +
 .../algorithms/TestContainerPlacementFactory.java  |   20 +-
 .../TestContainerPlacementStatusDefault.java       |   22 +-
 .../TestSCMContainerPlacementRackScatter.java      |   96 +-
 .../container/replication/ReplicationTestUtil.java |   39 +-
 .../TestContainerReplicaPendingOps.java            |  127 +
 .../TestDatanodeCommandCountUpdatedHandler.java    |   49 +
 .../replication/TestECContainerReplicaCount.java   |  107 +
 .../replication/TestECMisReplicationHandler.java   |  174 ++
 .../replication/TestECOverReplicationHandler.java  |   44 +-
 .../replication/TestECUnderReplicationHandler.java |  379 ++-
 .../replication/TestLegacyReplicationManager.java  |   37 +-
 .../replication/TestMisReplicationHandler.java     |  182 ++
 .../replication/TestOverReplicatedProcessor.java   |   41 +-
 .../TestRatisContainerReplicaCount.java            |   58 +-
 .../TestRatisMisReplicationHandler.java            |  178 ++
 .../TestRatisOverReplicationHandler.java           |  281 ++
 .../TestRatisUnderReplicationHandler.java          |  228 ++
 .../replication/TestReplicationManager.java        |  302 ++-
 .../replication/TestReplicationManagerMetrics.java |    2 +
 .../replication/TestUnderReplicatedProcessor.java  |   65 +-
 .../TestClosedWithMismatchedReplicasHandler.java   |   16 +-
 .../TestClosedWithUnhealthyReplicasHandler.java    |  194 ++
 .../health/TestClosingContainerHandler.java        |  103 +-
 .../health/TestDeletingContainerHandler.java       |  238 ++
 .../health/TestECReplicationCheckHandler.java      |  378 ++-
 ...Handler.java => TestEmptyContainerHandler.java} |  177 +-
 .../health/TestRatisReplicationCheckHandler.java   |  545 ++++
 .../report/TestContainerReportValidator.java       |   77 +
 .../hdds/scm/container/report/package-info.java    |   19 +-
 .../scm/metadata/TestSCMMetadataStoreImpl.java     |   10 -
 .../scm/node/DatanodeAdminMonitorTestUtil.java     |  209 ++
 .../hadoop/hdds/scm/node/TestCommandQueue.java     |  101 +
 .../scm/node/TestCommandQueueReportHandler.java    |   30 +-
 .../hdds/scm/node/TestContainerPlacement.java      |    4 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |  224 +-
 .../hdds/scm/node/TestNodeDecommissionMetrics.java |  343 +++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   74 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |   67 +-
 .../scm/pipeline/TestPipelinePlacementFactory.java |  215 ++
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |   15 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |   22 +-
 .../pipeline/TestWritableECContainerProvider.java  |    5 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |    8 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |   27 +-
 .../ozone/container/common/TestEndPoint.java       |    8 +
 .../testutils/ReplicationNodeManagerMock.java      |   15 +-
 hadoop-hdds/test-utils/pom.xml                     |   10 +-
 .../org/apache/ozone/test/GenericTestUtils.java    |   51 +
 hadoop-hdds/tools/pom.xml                          |    8 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |  169 +-
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |    1 +
 .../hadoop/hdds/scm/cli/cert/CleanExpired.java     |  117 +
 .../hadoop/hdds/scm/cli/cert/TestCleanExpired.java |  100 +
 hadoop-ozone/client/pom.xml                        |    4 +-
 .../apache/hadoop/ozone/client/ObjectStore.java    |   13 -
 .../apache/hadoop/ozone/client/OzoneBucket.java    |   58 +
 .../org/apache/hadoop/ozone/client/OzoneKey.java   |   40 +-
 .../hadoop/ozone/client/OzoneKeyDetails.java       |   11 +-
 .../client/checksum/BaseFileChecksumHelper.java    |   12 +-
 .../client/checksum/ChecksumHelperFactory.java     |   52 +
 .../client/checksum/ECBlockChecksumComputer.java   |  221 ++
 .../client/checksum/ECFileChecksumHelper.java      |  211 ++
 .../client/io/BlockDataStreamOutputEntry.java      |  287 +++
 .../client/io/BlockDataStreamOutputEntryPool.java  |  290 +++
 .../ozone/client/io/BlockOutputStreamEntry.java    |   13 +
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |   34 +-
 .../client/io/ECBlockOutputStreamEntryPool.java    |    4 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |  234 +-
 ...yOutputStream.java => KeyDataStreamOutput.java} |  334 +--
 .../hadoop/ozone/client/io/KeyInputStream.java     |  436 +---
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   24 +-
 .../client/io/MultipartCryptoKeyInputStream.java   |  382 ---
 .../ozone/client/io/OzoneCryptoInputStream.java    |  157 +-
 ...utputStream.java => OzoneDataStreamOutput.java} |   49 +-
 .../hadoop/ozone/client/io/OzoneOutputStream.java  |   25 +-
 .../ozone/client/protocol/ClientProtocol.java      |   59 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  363 ++-
 .../hadoop/ozone/client/MockOmTransport.java       |   14 +
 .../hadoop/ozone/client/TestOzoneECClient.java     |   51 +-
 hadoop-ozone/common/pom.xml                        |    4 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   12 +-
 .../hadoop/ozone/om/helpers/BucketLayout.java      |   13 +
 .../ozone/om/helpers/KeyInfoWithVolumeContext.java |  110 +
 .../hadoop/ozone/om/helpers/OmBucketArgs.java      |   70 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  |   55 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  106 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   71 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   32 +-
 .../hadoop/ozone/om/helpers/TestOmBucketArgs.java  |   91 +
 hadoop-ozone/csi/pom.xml                           |    4 +-
 hadoop-ozone/datanode/pom.xml                      |    4 +-
 hadoop-ozone/dev-support/checks/coverage.sh        |    6 +-
 hadoop-ozone/dev-support/intellij/log4j.properties |    2 +
 .../dev-support/intellij/ozone-site-ha.xml         |  174 ++
 hadoop-ozone/dev-support/intellij/ozone-site.xml   |   20 +
 ...torageContainerManager.xml => Datanode1-ha.xml} |    8 +-
 .../{Datanode2.xml => Datanode2-ha.xml}            |    4 +-
 .../intellij/runConfigurations/Datanode2.xml       |    2 +-
 .../{Datanode2.xml => Datanode3-ha.xml}            |    4 +-
 .../intellij/runConfigurations/Datanode3.xml       |    2 +-
 ...ageContainerManager.xml => OzoneFsShell-ha.xml} |   13 +-
 ...ageContainerManager.xml => OzoneManager-ha.xml} |    8 +-
 ...ontainerManager.xml => OzoneManagerInit-ha.xml} |    8 +-
 ...orageContainerManager.xml => OzoneShell-ha.xml} |    8 +-
 ...geContainerManager.xml => PrimordialSCM-ha.xml} |    4 +-
 ...ntainerManager.xml => PrimordialSCMInit-ha.xml} |    4 +-
 .../{StorageContainerManager.xml => Recon-ha.xml}  |    8 +-
 .../{StorageContainerManager.xml => Scm2-ha.xml}   |    4 +-
 ...geContainerManager.xml => Scm2Bootstrap-ha.xml} |    4 +-
 .../{StorageContainerManager.xml => Scm3-ha.xml}   |    4 +-
 ...geContainerManager.xml => Scm3Bootstrap-ha.xml} |    4 +-
 .../{StorageContainerManager.xml => ScmRoles.xml}  |    8 +-
 .../runConfigurations/StorageContainerManager.xml  |    2 +-
 .../dist/dev-support/bin/dist-layout-stitching     |    1 -
 hadoop-ozone/dist/pom.xml                          |   16 +-
 .../ozone-legacy-bucket/.env}                      |   21 +-
 .../src/main/compose/ozone-legacy-bucket/README.md |   18 +-
 .../ozone-legacy-bucket/docker-compose.yaml        |   78 +
 .../{ozone => ozone-legacy-bucket}/docker-config   |    2 +
 .../compose/ozone-legacy-bucket/test.sh}           |   29 +-
 .../main/compose/ozone-om-ha/docker-compose.yaml   |    5 +
 .../dist/src/main/compose/ozone/docker-config      |    2 +
 .../src/main/compose/ozonesecure/docker-config     |    2 +
 hadoop-ozone/dist/src/main/license/bin/LICENSE.txt |  146 +-
 hadoop-ozone/dist/src/main/license/bin/NOTICE.txt  |    2 +-
 .../main/license/bin/licenses/LICENSE-angular.txt  |    2 +-
 .../src/main/license/bin/licenses/LICENSE-d3.txt   |    2 +-
 .../licenses/LICENSE-glyphicons.txt}               |    4 +-
 .../main/license/bin/licenses/LICENSE-guava.txt    |   13 +
 .../LICENSE-javax.activation-activation.txt        |    3 -
 .../LICENSE-net.sf.jopt-simple-jopt-simple.txt     |   24 -
 .../license/bin/licenses/LICENSE-org.aspectj.html  |   91 -
 .../license/bin/licenses/LICENSE-org.aspectj.txt   |  279 ++
 .../bin/licenses/LICENSE-org.ow2.asm-asm.txt       |    1 -
 .../license/bin/licenses/LICENSE-org.slf4j.txt     |    2 +-
 ...y-misc.txt => NOTICE-ratis-thirdparty-misc.txt} |   24 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   14 +-
 .../main/license/src/licenses/LICENSE-angular.txt  |    2 +-
 .../{LICENSE-angular.txt => LICENSE-bootstrap.txt} |    4 +-
 .../src/main/license/src/licenses/LICENSE-d3.txt   |    4 +-
 ...{LICENSE-angular.txt => LICENSE-glyphicons.txt} |    4 +-
 .../main/license/src/licenses/LICENSE-guava.txt    |   13 +
 .../main/license/src/licenses/LICENSE-jquery.txt   |    2 +-
 .../src/main/license/src/licenses/LICENSE-nvd3.txt |   17 +-
 .../dist/src/main/license/update-jar-report.sh     |    2 +-
 .../src/main/smoketest/basic/ozone-shell-lib.robot |    7 +-
 .../smoketest/compatibility/dn-one-rocksdb.robot   |    2 +-
 .../dist/src/main/smoketest/ec/ozonefs.robot       |   61 +
 .../src/main/smoketest/freon/read-write-key.robot  |   53 +
 .../dist/src/main/smoketest/ozonefs/ozonefs.robot  |   37 +-
 ...n-fso-nssummary.robot => recon-nssummary.robot} |   12 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |    5 +
 .../dist/src/main/smoketest/s3/objectputget.robot  |   26 +
 hadoop-ozone/dist/src/shell/ozone/ozone            |   24 +-
 .../dist/src/shell/ozone/ozone-functions.sh        |   87 +
 .../fault-injection-test/mini-chaos-tests/pom.xml  |    4 +-
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |    2 -
 .../fault-injection-test/network-tests/pom.xml     |    2 +-
 hadoop-ozone/fault-injection-test/pom.xml          |    4 +-
 hadoop-ozone/insight/pom.xml                       |    4 +-
 hadoop-ozone/integration-test/pom.xml              |    4 +-
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  111 +-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java |  160 ++
 .../hadoop/fs/ozone/TestOzoneFileChecksum.java     |  177 ++
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  182 +-
 .../fs/ozone/TestOzoneFileSystemWithStreaming.java |  158 ++
 .../hadoop/fs/ozone/TestRootedDDSWithFSO.java      |    2 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |  158 +-
 .../hdds/scm/TestSCMDatanodeProtocolServer.java    |   54 +
 .../TestContainerStateManagerIntegration.java      |    2 +-
 .../TestRatisPipelineCreateAndDestroy.java         |    9 +-
 .../hdds/scm/storage/TestContainerCommandsEC.java  |   17 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   20 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   23 +
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |    4 +
 .../hadoop/ozone/TestOzoneConfigurationFields.java |    4 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  262 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |  157 +-
 .../ozone/TestStorageContainerManagerHelper.java   |   20 +-
 .../ozone/client/CertificateClientTestImpl.java    |  164 +-
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |    3 -
 .../client/rpc/TestBlockDataStreamOutput.java      |  273 ++
 .../ozone/client/rpc/TestBlockOutputStream.java    |    2 -
 .../rpc/TestBlockOutputStreamFlushDelay.java       |    2 -
 .../rpc/TestBlockOutputStreamWithFailures.java     |    2 -
 ...estBlockOutputStreamWithFailuresFlushDelay.java |    2 -
 .../rpc/TestCloseContainerHandlingByClient.java    |    9 -
 .../client/rpc/TestContainerStateMachine.java      |    2 -
 .../TestContainerStateMachineFailureOnRead.java    |    2 -
 .../rpc/TestContainerStateMachineFailures.java     |    1 -
 .../rpc/TestContainerStateMachineFlushDelay.java   |    2 -
 .../rpc/TestContainerStateMachineStream.java       |  219 ++
 .../client/rpc/TestDeleteWithSlowFollower.java     |    3 -
 .../client/rpc/TestDiscardPreallocatedBlocks.java  |    2 -
 .../ozone/client/rpc/TestECKeyOutputStream.java    |   24 +-
 .../client/rpc/TestFailureHandlingByClient.java    |    5 -
 .../rpc/TestFailureHandlingByClientFlushDelay.java |    1 -
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |    4 -
 .../client/rpc/TestOzoneAtRestEncryption.java      |    7 +-
 .../rpc/TestOzoneClientMultipartUploadWithFSO.java |  441 ++--
 ...estOzoneClientRetriesOnExceptionFlushDelay.java |    4 -
 .../rpc/TestOzoneClientRetriesOnExceptions.java    |    3 -
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  106 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  131 +-
 .../hadoop/ozone/client/rpc/TestReadRetries.java   |    2 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |    1 -
 .../client/rpc/TestValidateBCSIDOnRestart.java     |    2 +-
 .../client/rpc/read/TestChunkInputStream.java      |    6 +-
 .../ozone/client/rpc/read/TestInputStreamBase.java |    2 -
 .../ozone/client/rpc/read/TestKeyInputStream.java  |    6 +-
 .../ozone/container/TestECContainerRecovery.java   |  130 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   42 +-
 .../commandhandler/TestBlockDeletion.java          |   10 +-
 .../TestCloseContainerByPipeline.java              |    6 +-
 .../commandhandler/TestCloseContainerHandler.java  |    1 -
 .../commandhandler/TestDeleteContainerHandler.java |    1 -
 .../metrics/TestDatanodeQueueMetrics.java          |  111 +
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |  239 +-
 .../container/server/TestContainerServer.java      |    3 +-
 .../server/TestSecureContainerServer.java          |    4 +
 .../ozone/dn/ratis/TestDnRatisLogParser.java       |    1 -
 .../hadoop/ozone/dn/scanner/TestDataScanner.java   |  182 +-
 .../TestDatanodeHddsVolumeFailureDetection.java    |   36 +-
 .../ozone/om/TestContainerReportWithKeys.java      |    1 -
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |   80 +-
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |    1 +
 .../org/apache/hadoop/ozone/om/TestLDBCli.java     |   21 +
 .../hadoop/ozone/om/TestOMEpochForNonRatis.java    |    2 +-
 .../ozone/om/TestObjectStoreWithLegacyFS.java      |   52 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  171 +-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |    2 -
 .../ozone/om/TestOmContainerLocationCache.java     |  674 +++++
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   74 +-
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |   11 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |   48 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |    3 -
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   20 +-
 .../ozone/om/multitenant/RangerUserRequest.java    |    3 +-
 .../TestRangerBGSyncService.java                   |    7 +-
 .../ozone/parser/TestOzoneHARatisLogParser.java    |   61 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |    4 +-
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |   40 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |    5 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   20 -
 .../ozone/scm/TestStorageContainerManagerHA.java   |    2 +-
 .../TestSCMPipelineBytesWrittenMetrics.java        |    2 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |    6 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |   41 +-
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |    2 +-
 hadoop-ozone/interface-client/pom.xml              |    4 +-
 .../src/main/proto/OmClientProtocol.proto          |   35 +-
 .../interface-client/src/main/resources/proto.lock | 2672 ++++++++++++++++----
 hadoop-ozone/interface-storage/pom.xml             |    4 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   12 +-
 hadoop-ozone/ozone-manager/pom.xml                 |    5 +-
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |   80 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   11 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  231 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   33 +
 .../hadoop/ozone/om/OMMultiTenantManager.java      |    2 +-
 .../hadoop/ozone/om/OMMultiTenantManagerImpl.java  |    2 +-
 .../hadoop/ozone/om/OMPerformanceMetrics.java      |   71 +-
 .../java/org/apache/hadoop/ozone/om/OMStorage.java |    4 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   17 +-
 .../apache/hadoop/ozone/om/OmMetadataReader.java   |   83 +-
 .../apache/hadoop/ozone/om/OzoneConfigUtil.java    |   35 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  452 ++--
 .../hadoop/ozone/om/OzonePrefixPathImpl.java       |    2 -
 .../java/org/apache/hadoop/ozone/om/ScmClient.java |    8 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |    4 +-
 .../apache/hadoop/ozone/om/TrashPolicyOzone.java   |  105 +-
 .../org/apache/hadoop/ozone/om/VolumeManager.java  |    2 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |   33 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |    8 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |    3 -
 .../om/request/bucket/OMBucketCreateRequest.java   |   17 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |   48 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |   10 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |   25 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |   11 +-
 .../request/file/OMFileCreateRequestWithFSO.java   |    6 +-
 .../om/request/key/OMAllocateBlockRequest.java     |    6 +-
 .../request/key/OMAllocateBlockRequestWithFSO.java |    3 +-
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |   80 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |   31 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |   29 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   25 +-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |    6 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   69 +-
 .../S3InitiateMultipartUploadRequestWithFSO.java   |   13 +-
 .../S3MultipartUploadCommitPartRequest.java        |    9 +-
 .../S3MultipartUploadCompleteRequest.java          |    3 +-
 .../tenant/OMTenantAssignUserAccessIdRequest.java  |    3 +-
 .../request/s3/tenant/OMTenantCreateRequest.java   |   11 +
 .../response/file/OMDirectoryCreateResponse.java   |    9 +-
 .../file/OMDirectoryCreateResponseWithFSO.java     |   10 +-
 .../response/file/OMFileCreateResponseWithFSO.java |    6 +
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |   19 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |    6 +
 .../ozone/om/response/key/OMKeyCreateResponse.java |    7 +
 .../response/key/OMKeyDeleteResponseWithFSO.java   |    4 +-
 .../response/key/OMKeysDeleteResponseWithFSO.java  |    4 +-
 .../S3InitiateMultipartUploadResponseWithFSO.java  |   12 +-
 .../om/{ => service}/DirectoryDeletingService.java |  280 +-
 .../ozone/om/{ => service}/KeyDeletingService.java |   33 +-
 .../OMRangerBGSyncService.java                     |    8 +-
 .../om/{ => service}/OpenKeyCleanupService.java    |    5 +-
 .../hadoop/ozone/om/service/package-info.java}     |    9 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   76 +-
 .../hadoop/ozone/security/AWSV4AuthValidator.java  |    4 +-
 .../hadoop/ozone/security/OMCertificateClient.java |  203 ++
 .../webapps/ozoneManager/om-overview.html          |    4 +
 .../apache/hadoop/ozone/om/TestChunkStreams.java   |  150 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  179 +-
 .../hadoop/ozone/om/TestSstFilteringService.java   |   65 +-
 .../om/ratis/TestOzoneManagerRatisServer.java      |    2 +-
 .../ozone/om/request/OMRequestTestUtils.java       |   15 +
 .../ozone/om/request/bucket/TestBucketRequest.java |    1 +
 .../request/bucket/TestOMBucketCreateRequest.java  |   46 +
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |   71 +-
 .../bucket/TestOMBucketSetPropertyRequest.java     |  336 ++-
 .../request/file/TestOMDirectoryCreateRequest.java |   37 +
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |   39 +
 .../om/request/file/TestOMFileCreateRequest.java   |   57 +-
 .../file/TestOMFileCreateRequestWithFSO.java       |   31 +
 .../om/request/key/TestOMAllocateBlockRequest.java |   24 +
 .../TestOMDirectoriesPurgeRequestAndResponse.java  |  297 +++
 .../om/request/key/TestOMKeyCommitRequest.java     |  127 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |   55 +
 .../request/key/TestOMKeyCreateRequestWithFSO.java |    6 +
 .../TestS3MultipartUploadCompleteRequest.java      |    6 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |    6 +
 .../file/TestOMDirectoryCreateResponse.java        |   19 +-
 .../file/TestOMDirectoryCreateResponseWithFSO.java |   19 +-
 .../key/TestOMKeysDeleteResponseWithFSO.java       |    7 +-
 .../s3/multipart/TestS3MultipartResponse.java      |   12 +-
 .../TestS3MultipartUploadAbortResponse.java        |    3 +-
 .../TestS3MultipartUploadAbortResponseWithFSO.java |   12 +-
 .../om/{ => service}/TestKeyDeletingService.java   |  177 +-
 .../{ => service}/TestOpenKeyCleanupService.java   |    6 +-
 .../security/TestOmCertificateClientInit.java      |   67 +-
 .../TestOzoneDelegationTokenSecretManager.java     |    1 -
 hadoop-ozone/ozonefs-common/pom.xml                |    8 +-
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |   37 +-
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |   74 +-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |   61 +-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |  160 +-
 .../apache/hadoop/fs/ozone/FileStatusAdapter.java  |   57 +-
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |    3 +
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   49 +-
 .../hadoop/fs/ozone/OzoneFSDataStreamOutput.java   |  103 +
 .../hadoop/fs/ozone/OzoneFSOutputStream.java       |   19 +-
 .../hadoop/fs/ozone/TestOzoneClientUtils.java      |   32 -
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |   10 +-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |   20 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |   20 +-
 .../pom.xml                                        |  104 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |    4 +-
 hadoop-ozone/ozonefs-shaded/pom.xml                |    4 +-
 hadoop-ozone/ozonefs/pom.xml                       |    4 +-
 hadoop-ozone/pom.xml                               |   12 +-
 hadoop-ozone/recon-codegen/pom.xml                 |    3 +-
 hadoop-ozone/recon/pom.xml                         |    4 +-
 .../hadoop/ozone/recon/ReconControllerModule.java  |    4 +-
 .../org/apache/hadoop/ozone/recon/ReconServer.java |  137 +-
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |   20 +-
 .../ozone/recon/api/ClusterStateEndpoint.java      |   23 +-
 .../hadoop/ozone/recon/api/ContainerEndpoint.java  |    1 +
 .../ozone/recon/api/TriggerDBSyncEndpoint.java     |   52 +
 .../ozone/recon/api/handlers/BucketHandler.java    |   56 +-
 .../recon/api/handlers/DirectoryEntityHandler.java |   20 +-
 .../ozone/recon/api/handlers/FSOBucketHandler.java |    4 +-
 .../ozone/recon/api/handlers/KeyEntityHandler.java |    3 +-
 .../recon/api/handlers/LegacyBucketHandler.java    |  325 +++
 .../recon/api/handlers/RootEntityHandler.java      |   24 +-
 .../recon/api/types/ClusterStateResponse.java      |   36 +
 .../hadoop/ozone/recon/api/types/NSSummary.java    |    4 +-
 .../ozone/recon/fsck/ContainerHealthTask.java      |   67 +-
 .../ozone/recon/fsck/ReconSafeModeMgrTask.java     |  122 +
 .../hadoop/ozone/recon/scm/PipelineSyncTask.java   |   33 +-
 .../ozone/recon/scm/ReconContainerManager.java     |   27 +-
 .../recon/scm/ReconDatanodeProtocolServer.java     |    2 +-
 .../ozone/recon/scm/ReconDeadNodeHandler.java      |   12 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |    4 +-
 .../ozone/recon/scm/ReconSafeModeManager.java      |   12 +-
 .../ozone/recon/scm/ReconStaleNodeHandler.java     |   58 +
 .../hadoop/ozone/recon/scm/ReconStorageConfig.java |    4 +
 .../scm/ReconStorageContainerManagerFacade.java    |  250 +-
 .../recon/spi/OzoneManagerServiceProvider.java     |    6 +
 .../recon/spi/StorageContainerServiceProvider.java |   21 +
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  153 +-
 .../impl/ReconContainerMetadataManagerImpl.java    |   34 +-
 .../impl/StorageContainerServiceProviderImpl.java  |   18 +-
 .../hadoop/ozone/recon/tasks/NSSummaryTask.java    |  233 +-
 ...yTask.java => NSSummaryTaskDbEventHandler.java} |   95 +-
 .../ozone/recon/tasks/NSSummaryTaskWithFSO.java    |   71 +-
 .../ozone/recon/tasks/NSSummaryTaskWithLegacy.java |  310 +++
 .../hadoop/ozone/recon/tasks/ReconTaskConfig.java  |   17 +
 .../resources/webapps/recon/ozone-recon-web/NOTICE |    5 -
 .../webapps/recon/ozone-recon-web/api/db.json      |  462 +++-
 .../webapps/recon/ozone-recon-web/api/routes.json  |    4 +-
 .../components/autoReloadPanel/autoReloadPanel.tsx |   18 +-
 .../src/components/navBar/navBar.tsx               |    5 +
 .../ozone-recon-web/src/utils/autoReloadHelper.tsx |    1 +
 .../recon/ozone-recon-web/src/utils/themeIcons.tsx |    6 +-
 .../src/views/datanodes/datanodes.tsx              |   20 +-
 .../src/views/diskUsage/diskUsage.less             |   36 +-
 .../src/views/diskUsage/diskUsage.tsx              |   68 +-
 .../src/views/insights/insights.tsx                |   10 +-
 .../views/missingContainers/missingContainers.tsx  |  240 +-
 .../src/views/overview/overview.tsx                |   49 +-
 .../src/views/pipelines/pipelines.tsx              |    4 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |   28 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |    8 +-
 .../recon/api/TestNSSummaryEndpointWithFSO.java    |  114 +-
 ...O.java => TestNSSummaryEndpointWithLegacy.java} |  389 +--
 .../recon/api/TestTotalOpenContainerCount.java     |  396 +++
 .../ozone/recon/api/TestTriggerDBSyncEndpoint.java |  143 ++
 .../ozone/recon/api/filters/TestAdminFilter.java   |    2 +
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   17 +-
 .../TestContainerHealthTaskRecordGenerator.java    |   42 +-
 .../scm/AbstractReconContainerManagerTest.java     |    4 +-
 .../impl/TestOzoneManagerServiceProviderImpl.java  |   70 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |  492 ++++
 .../recon/tasks/TestNSSummaryTaskWithFSO.java      |   21 +-
 ...thFSO.java => TestNSSummaryTaskWithLegacy.java} |  328 ++-
 .../ozone/recon/tasks/TestTableCountTask.java      |    2 +-
 hadoop-ozone/s3gateway/pom.xml                     |    5 +-
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |   72 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |   31 +-
 .../org/apache/hadoop/ozone/s3/util/S3Consts.java  |    2 +
 .../hadoop/ozone/client/ClientProtocolStub.java    |   37 +
 .../hadoop/ozone/client/OzoneBucketStub.java       |    4 +-
 .../hadoop/ozone/client/OzoneOutputStreamStub.java |    2 +-
 .../hadoop/ozone/s3/endpoint/TestEndpointBase.java |  106 +
 .../ozone/s3/endpoint/TestPermissionCheck.java     |    3 +-
 hadoop-ozone/tools/pom.xml                         |    4 +-
 .../ozone/admin/nssummary/DiskUsageSubCommand.java |    7 +-
 .../admin/nssummary/FileSizeDistSubCommand.java    |    7 +-
 .../ozone/admin/nssummary/NSSummaryAdmin.java      |   57 +
 .../ozone/admin/nssummary/NSSummaryCLIUtils.java   |    9 +-
 .../admin/nssummary/QuotaUsageSubCommand.java      |    7 +-
 .../ozone/admin/nssummary/SummarySubCommand.java   |    7 +-
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |    1 -
 .../org/apache/hadoop/ozone/debug/DBScanner.java   |   64 +-
 .../ozone/debug/container/ExportSubcommand.java    |    4 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   14 +-
 .../hadoop/ozone/freon/ContentGenerator.java       |   18 +
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |    2 +
 .../hadoop/ozone/freon/KeyGeneratorUtil.java       |   50 +
 .../ozone/freon/OzoneClientKeyGenerator.java       |   29 +-
 .../ozone/freon/OzoneClientKeyReadWriteOps.java    |  245 ++
 .../hadoop/ozone/freon/RangeKeysGenerator.java     |  164 ++
 .../java/org/apache/hadoop/ozone/shell/Shell.java  |   21 +-
 .../ozone/shell/bucket/ClearQuotaHandler.java      |   12 +-
 .../hadoop/ozone/shell/bucket/SetQuotaHandler.java |   12 +-
 .../hadoop/ozone/shell/keys/GetKeyHandler.java     |    4 +-
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     |   67 +-
 .../ozone/shell/volume/ClearQuotaHandler.java      |   12 +-
 .../hadoop/ozone/shell/volume/SetQuotaHandler.java |   11 +-
 pom.xml                                            |  442 +---
 841 files changed, 43992 insertions(+), 13332 deletions(-)

diff --cc 
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8011e35e31,70c22eebfc..e447dd7ba2
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@@ -531,26 -553,24 +564,45 @@@ public final class OzoneConfigKeys 
    public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_OMAUDIT =
        "ozone.audit.log.debug.cmd.list.omaudit";
  
+   // Items listing page size for fs client sub-commands output
+   public static final String
+       OZONE_FS_LISTING_PAGE_SIZE = "ozone.fs.listing.page.size";
+ 
+   public static final int
+       OZONE_FS_LISTING_PAGE_SIZE_DEFAULT = 1024;
+ 
+   public static final int
+       OZONE_FS_MAX_LISTING_PAGE_SIZE = 5000;
+ 
+   public static final String
+       OZONE_FS_LISTING_PAGE_SIZE_MAX = "ozone.fs.listing.page.size.max";
+ 
+ 
+   public static final String FS_TRASH_CLASSNAME = "fs.trash.classname";
+   public static final String FS_TRASH_CLASSNAME_DEFAULT =
+       "org.apache.hadoop.ozone.om.TrashPolicyOzone";
+ 
++
 +  public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
 +      "ozone.om.snapshot.cache.max.size";
 +  public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
 +
 +  public static final String
 +      OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED =
 +      "ozone.om.snapshot.compaction.dag.max.time.allowed";
 +
 +  public static final long
 +      OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT =
 +      TimeUnit.DAYS.toMillis(30);
 +
 +  public static final String
 +      OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL =
 +      "ozone.om.snapshot.compaction.dag.prune.daemon.run.interval";
 +
 +  public static final long
 +      OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT =
 +      TimeUnit.HOURS.toMillis(1);
 +
    /**
     * There is no need to instantiate this class.
     */
diff --cc hadoop-hdds/common/src/main/resources/ozone-default.xml
index 64977ecd32,d23473ec85..dd11e42ce2
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@@ -3407,23 -3466,61 +3515,81 @@@
      </description>
    </property>
  
+   <property>
+     <name>ozone.fs.listing.page.size</name>
+     <value>1024</value>
+     <tag>OZONE, CLIENT</tag>
+     <description>
+       Listing page size value used by client for listing number of items on 
fs related sub-commands output.
+       Kindly set this config value responsibly to avoid high resource usage. 
Maximum value restricted is 5000 for
+       optimum performance.
+     </description>
+   </property>
+ 
+   <property>
+     <name>ozone.fs.listing.page.size.max</name>
+     <value>5000</value>
+     <tag>OZONE, OM</tag>
+     <description>
+       Maximum listing page size value enforced by server for listing items on 
fs related sub-commands output. Kindly set
+       this config value responsibly to avoid high resource usage. Maximum 
value restricted is 5000 for
+       optimum performance.
+     </description>
+   </property>
+ 
+   <property>
+     <name>ozone.recon.nssummary.flush.db.max.threshold</name>
+     <value>150000</value>
+     <tag>OZONE, RECON, PERFORMANCE</tag>
+     <description>
+       Maximum threshold number of entries to hold in memory for NSSummary 
task in hashmap before flushing to
+       recon rocks DB namespaceSummaryTable
+     </description>
+   </property>
+ 
+   <property>
+     <name>ozone.fs.datastream.enabled</name>
+     <value>false</value>
+     <tag>OZONE, DATANODE</tag>
+     <description>
+       To enable/disable filesystem write via ratis streaming.
+     </description>
+   </property>
+ 
+   <property>
+     <name>ozone.recon.scm.snapshot.task.initial.delay</name>
+     <value>1m</value>
+     <tag>OZONE, MANAGEMENT, RECON</tag>
+     <description>
+       Initial delay in MINUTES by Recon to request SCM DB Snapshot.
+     </description>
+   </property>
++
+   <property>
+     <name>ozone.recon.scm.snapshot.task.interval.delay</name>
+     <value>24h</value>
+     <tag>OZONE, MANAGEMENT, RECON</tag>
+     <description>
+       Interval in MINUTES by Recon to request SCM DB Snapshot.
+     </description>
+   </property>
 +  <property>
 +    <name>ozone.om.snapshot.compaction.dag.max.time.allowed</name>
 +    <value>30d</value>
 +    <tag>OZONE, OM</tag>
 +    <description>
 +      Maximum time a snapshot is allowed to be in compaction DAG before it 
gets pruned out by pruning daemon.
 +      Uses millisecond by default when no time unit is specified.
 +    </description>
 +  </property>
 +
 +  <property>
 +    <name>ozone.om.snapshot.compaction.dag.prune.daemon.run.interval</name>
 +    <value>3600s</value>
 +    <tag>OZONE, OM</tag>
 +    <description>
 +      Interval at which compaction DAG pruning daemon thread is running to 
remove older snapshots with compaction
 +      history from compaction DAG. Uses millisecond by default when no time 
unit is specified.
 +    </description>
 +  </property>
  </configuration>
diff --cc hadoop-hdds/framework/pom.xml
index 1e08ecebe9,49cd738f54..b2a253ad26
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@@ -147,13 -155,11 +155,18 @@@ https://maven.apache.org/xsd/maven-4.0.
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-params</artifactId>
      </dependency>
+     <dependency>
+       <groupId>com.github.spotbugs</groupId>
+       <artifactId>spotbugs-annotations</artifactId>
+       <scope>compile</scope>
+     </dependency>
 +
 +    <dependency>
 +      <groupId>org.apache.ozone</groupId>
 +      <artifactId>rocksdb-checkpoint-differ</artifactId>
 +      <version>${hdds.version}</version>
 +    </dependency>
 +
    </dependencies>
  
  
diff --cc 
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index b96ed1d768,83e9d3b515..c9b36674df
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@@ -17,11 -17,9 +17,12 @@@
   */
  package org.apache.hadoop.hdds.utils.db;
  
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.hdds.utils.BooleanTriFunction;
  import org.apache.hadoop.hdds.utils.HddsServerUtil;
  import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint;
+ import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
  import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
  import org.apache.hadoop.hdds.utils.db.managed.ManagedFlushOptions;
  import 
org.apache.hadoop.hdds.utils.db.managed.ManagedIngestExternalFileOptions;
@@@ -72,11 -68,8 +74,11 @@@ import static org.rocksdb.RocksDB.listC
  public final class RocksDatabase {
    static final Logger LOG = LoggerFactory.getLogger(RocksDatabase.class);
  
-   static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";
+   public static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";
  
 +  private static List<ColumnFamilyHandle> columnFamilyHandles =
 +      new ArrayList<>();
 +
  
    static IOException toIOException(Object name, String op, RocksDBException 
e) {
      return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
@@@ -145,10 -138,10 +147,11 @@@
          db = ManagedRocksDB.open(dbOptions, dbFile.getAbsolutePath(),
              descriptors, handles);
        }
 +      columnFamilyHandles = handles;
        // init a column family map.
+       AtomicLong counter = new AtomicLong(0);
        for (ColumnFamilyHandle h : handles) {
-         final ColumnFamily f = new ColumnFamily(h);
+         final ColumnFamily f = new ColumnFamily(h, counter);
          columnFamilies.put(f.getName(), f);
        }
        return new RocksDatabase(dbFile, db, dbOptions, writeOptions,
@@@ -369,33 -441,16 +451,37 @@@
      }
    }
  
 +  /**
 +   * @param cfName columnFamily on which flush will run.
 +   * @throws IOException
 +   */
 +  public void flush(String cfName) throws IOException {
 +    ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
 +    try (ManagedFlushOptions options = new ManagedFlushOptions()) {
 +      options.setWaitForFlush(true);
 +      if (handle != null) {
 +        db.get().flush(options, handle);
 +      } else {
 +        LOG.error("Provided column family doesn't exist."
 +            + " Calling flush on null columnFamily");
 +        flush();
 +      }
 +    } catch (RocksDBException e) {
-       closeOnError(e);
++      closeOnError(e, true);
 +      throw toIOException(this, "flush", e);
 +    }
 +  }
 +
    public void flushWal(boolean sync) throws IOException {
+     assertClose();
      try {
+       counter.incrementAndGet();
        db.get().flushWal(sync);
      } catch (RocksDBException e) {
-       closeOnError(e);
+       closeOnError(e, true);
        throw toIOException(this, "flushWal with sync=" + sync, e);
+     } finally {
+       counter.decrementAndGet();
      }
    }
  
@@@ -408,41 -467,6 +498,41 @@@
      }
    }
  
 +  /**
 +   * @param cfName columnFamily on which compaction will run.
 +   * @throws IOException
 +   */
 +  public void compactRange(String cfName) throws IOException {
 +    ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
 +    try {
 +      if (handle != null) {
 +        db.get().compactRange(handle);
 +      } else {
 +        LOG.error("Provided column family doesn't exist."
 +            + " Calling compactRange on null columnFamily");
 +        db.get().compactRange();
 +      }
 +    } catch (RocksDBException e) {
-       closeOnError(e);
++      closeOnError(e, true);
 +      throw toIOException(this, "compactRange", e);
 +    }
 +  }
 +
 +  private ColumnFamilyHandle getColumnFamilyHandle(String cfName)
 +      throws IOException {
 +    for (ColumnFamilyHandle cf : getColumnFamilyHandles()) {
 +      try {
 +        if (cfName.equals(new String(cf.getName(), StandardCharsets.UTF_8))) {
 +          return cf;
 +        }
 +      } catch (RocksDBException e) {
-         closeOnError(e);
++        closeOnError(e, true);
 +        throw toIOException(this, "columnFamilyHandle.getName", e);
 +      }
 +    }
 +    return null;
 +  }
 +
    RocksCheckpoint createCheckpoint() {
      return new RocksCheckpoint();
    }
diff --cc hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index 3c28aac8cb,0000000000..8e372ffc3c
mode 100644,000000..100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@@ -1,213 -1,0 +1,213 @@@
 +<?xml version="1.0" encoding="UTF-8"?>
 +<!--
 +  Licensed under the Apache License, Version 2.0 (the "License");
 +  you may not use this file except in compliance with the License.
 +  You may obtain a copy of the License at
 +
 +    http://www.apache.org/licenses/LICENSE-2.0
 +
 +  Unless required by applicable law or agreed to in writing, software
 +  distributed under the License is distributed on an "AS IS" BASIS,
 +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +  See the License for the specific language governing permissions and
 +  limitations under the License. See accompanying LICENSE file.
 +-->
 +<project xmlns="http://maven.apache.org/POM/4.0.0";
 +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
 +https://maven.apache.org/xsd/maven-4.0.0.xsd";>
 +  <modelVersion>4.0.0</modelVersion>
 +  <parent>
 +    <groupId>org.apache.ozone</groupId>
 +    <artifactId>hdds</artifactId>
-     <version>1.3.0-SNAPSHOT</version>
++    <version>1.4.0-SNAPSHOT</version>
 +  </parent>
 +
 +  <artifactId>rocksdb-checkpoint-differ</artifactId>
-   <version>1.3.0-SNAPSHOT</version>
++  <version>1.4.0-SNAPSHOT</version>
 +  <description>RocksDB Checkpoint Differ</description>
 +  <name>RocksDB Checkpoint Differ</name>
 +  <packaging>jar</packaging>
 +
 +  <dependencies>
 +    <dependency>
 +      <groupId>org.rocksdb</groupId>
 +      <artifactId>rocksdbjni</artifactId>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.apache.ozone</groupId>
 +      <artifactId>hdds-common</artifactId>
-       <version>1.3.0-SNAPSHOT</version>
++      <version>${hdds.version}</version>
 +    </dependency>
 +    <dependency>
 +      <groupId>com.google.guava</groupId>
 +      <artifactId>guava</artifactId>
 +    </dependency>
 +    <dependency>
 +      <groupId>com.github.vlsi.mxgraph</groupId>
 +      <artifactId>jgraphx</artifactId>
 +      <version>4.2.2</version>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.jgrapht</groupId>
 +      <artifactId>jgrapht-core</artifactId>
 +      <version>1.5.0</version>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.jgrapht</groupId>
 +      <artifactId>jgrapht-guava</artifactId>
 +      <version>1.5.0</version>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.jgrapht</groupId>
 +      <artifactId>jgrapht-ext</artifactId>
 +      <version>1.4.0</version>
 +    </dependency>
 +    <dependency>
 +      <groupId>com.github.spotbugs</groupId>
 +      <artifactId>spotbugs-annotations</artifactId>
 +      <scope>compile</scope>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.apache.commons</groupId>
 +      <artifactId>commons-lang3</artifactId>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.slf4j</groupId>
 +      <artifactId>slf4j-reload4j</artifactId>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.junit.jupiter</groupId>
 +      <artifactId>junit-jupiter-api</artifactId>
 +      <scope>test</scope>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.apache.ozone</groupId>
 +      <artifactId>hdds-test-utils</artifactId>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.junit.jupiter</groupId>
 +      <artifactId>junit-jupiter-params</artifactId>
 +      <scope>test</scope>
 +    </dependency>
 +  </dependencies>
 +
 +  <build>
 +    <resources>
 +      <resource>
 +        <directory>${basedir}/src/main/resources</directory>
 +        <excludes>
 +          <exclude>ozone-version-info.properties</exclude>
 +        </excludes>
 +        <filtering>false</filtering>
 +      </resource>
 +      <resource>
 +        <directory>${basedir}/src/main/resources</directory>
 +        <includes>
 +          <include>ozone-version-info.properties</include>
 +        </includes>
 +        <filtering>true</filtering>
 +      </resource>
 +    </resources>
 +    <plugins>
 +      <plugin>
 +        <groupId>org.apache.hadoop</groupId>
 +        <artifactId>hadoop-maven-plugins</artifactId>
 +        <executions>
 +          <execution>
 +            <id>version-info</id>
 +            <phase>generate-resources</phase>
 +            <goals>
 +              <goal>version-info</goal>
 +            </goals>
 +            <configuration>
 +              <source>
 +                <directory>${basedir}/../</directory>
 +                <includes>
 +                  <include>*/src/main/java/**/*.java</include>
 +                  <include>*/src/main/proto/*.proto</include>
 +                </includes>
 +              </source>
 +            </configuration>
 +          </execution>
 +        </executions>
 +      </plugin>
 +      <plugin>
 +        <groupId>com.github.spotbugs</groupId>
 +        <artifactId>spotbugs-maven-plugin</artifactId>
 +        <configuration>
 +          
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
 +        </configuration>
 +      </plugin>
 +      <plugin>
 +        <artifactId>maven-enforcer-plugin</artifactId>
 +        <executions>
 +          <execution>
 +            <id>depcheck</id>
 +            <phase></phase>
 +          </execution>
 +          <execution>
 +            <id>banned-rocksdb-imports</id>
 +            <phase>process-sources</phase>
 +            <goals>
 +              <goal>enforce</goal>
 +            </goals>
 +            <configuration>
 +              <rules>
 +                <RestrictImports>
 +                  <includeTestCode>false</includeTestCode>
 +                  <reason>Use managed RocksObjects under 
org.apache.hadoop.hdds.utils.db.managed instead.</reason>
 +                  <!-- By default, ban all the classes in org.rocksdb -->
 +                  <bannedImport>org.rocksdb.**</bannedImport>
 +                  <allowedImports>
 +                    
<allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
 +                    <allowedImport>org.rocksdb.Checkpoint</allowedImport>
 +                    
<allowedImport>org.rocksdb.ColumnFamilyDescriptor</allowedImport>
 +                    
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
 +                    
<allowedImport>org.rocksdb.ColumnFamilyOptions</allowedImport>
 +                    
<allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
 +                    <allowedImport>org.rocksdb.CompressionType</allowedImport>
 +                    <allowedImport>org.rocksdb.DBOptions</allowedImport>
 +                    <allowedImport>org.rocksdb.FlushOptions</allowedImport>
 +                    
<allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
 +                    <allowedImport>org.rocksdb.Options</allowedImport>
 +                    <allowedImport>org.rocksdb.RocksDB</allowedImport>
 +                    
<allowedImport>org.rocksdb.RocksDBException</allowedImport>
 +                    <allowedImport>org.rocksdb.SstFileReader</allowedImport>
 +                    <allowedImport>org.rocksdb.TableProperties</allowedImport>
 +                    <allowedImport>org.rocksdb.ReadOptions</allowedImport>
 +                    
<allowedImport>org.rocksdb.SstFileReaderIterator</allowedImport>
 +                  </allowedImports>
 +                  
<exclusion>org.apache.hadoop.hdds.utils.db.managed.*</exclusion>
 +                </RestrictImports>
 +              </rules>
 +            </configuration>
 +          </execution>
 +        </executions>
 +      </plugin>
 +    </plugins>
 +  </build>
 +  <profiles>
 +    <profile>
 +      <id>k8s-dev</id>
 +      <build>
 +        <plugins>
 +          <plugin>
 +            <groupId>io.fabric8</groupId>
 +            <artifactId>docker-maven-plugin</artifactId>
 +            <configuration>
 +              <images>
 +                <image>
 +                  <name>${user.name}/ozone:${project.version}</name>
 +                  <build>
 +                    <dockerFileDir>${project.basedir}</dockerFileDir>
 +                  </build>
 +                </image>
 +              </images>
 +            </configuration>
 +          </plugin>
 +        </plugins>
 +      </build>
 +    </profile>
 +  </profiles>
 +</project>
diff --cc 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
index cc267d696a,543550b1bf..d22ba6061a
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
@@@ -40,10 -40,8 +40,9 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.TenantStateList;
  import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
  import org.apache.hadoop.ozone.om.helpers.TenantUserList;
- import org.apache.hadoop.ozone.om.protocol.S3Auth;
  import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
  import org.apache.hadoop.ozone.security.acl.OzoneObj;
 +import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
  import org.apache.hadoop.security.UserGroupInformation;
  
  import com.google.common.annotations.VisibleForTesting;
diff --cc hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 39f4d99894,f6e0bd1882..10f3cbc439
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@@ -279,7 -276,7 +278,8 @@@ public final class OmUtils 
        // write to OM DB. And therefore it doesn't need a OMClientRequest.
        // Although indirectly the Ranger sync service task could invoke write
        // operation SetRangerServiceVersion.
+     case GetKeyInfo:
 +    case SnapshotDiff:
        return true;
      case CreateVolume:
      case SetVolumeProperty:
diff --cc hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index f9bf6e7809,828dc7e27a..d5ef479a4d
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@@ -340,11 -415,22 +415,23 @@@ Apache License 2.
     org.codehaus.jackson:jackson-jaxrs
     org.codehaus.jackson:jackson-mapper-asl
     org.codehaus.jackson:jackson-xc
-    org.codehaus.jettison:jettison
+    org.eclipse.jetty:jetty-client
+    org.eclipse.jetty:jetty-http
+    org.eclipse.jetty:jetty-io
+    org.eclipse.jetty:jetty-security
+    org.eclipse.jetty:jetty-server
+    org.eclipse.jetty:jetty-servlet
+    org.eclipse.jetty:jetty-util
+    org.eclipse.jetty:jetty-util-ajax
+    org.eclipse.jetty:jetty-webapp
+    org.eclipse.jetty:jetty-xml
     org.hamcrest:hamcrest-all
     org.javassist:javassist
+    org.jetbrains:annotations
+    org.jetbrains.kotlin:kotlin-stdlib
+    org.jetbrains.kotlin:kotlin-stdlib-common
     org.jboss.weld.servlet:weld-servlet
 +   org.jheaps:jheaps
     org.jooq:jooq
     org.jooq:jooq-codegen
     org.jooq:jooq-meta
@@@ -397,7 -468,6 +488,7 @@@ Public Domai
  BSD 3-Clause
  =====================
  
-    com.google.code.findbugs:jsr305
++   com.github.vlsi.mxgraph:jgraphx
     com.google.protobuf:protobuf-java
     com.google.protobuf:protobuf-java-util
     com.google.re2j:re2j
diff --cc hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 49ea0cf6b3,5ee3146435..2aff916854
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@@ -122,12 -122,8 +122,13 @@@ enum Type 
  
    SetRangerServiceVersion = 107;
    RangerBGSync = 109;
 +
    EchoRPC = 110;
+   GetKeyInfo = 111;
 +
-   CreateSnapshot = 111;
-   ListSnapshot = 112;
-   SnapshotDiff = 113;
++  CreateSnapshot = 112;
++  ListSnapshot = 113;
++  SnapshotDiff = 114;
  }
  
  message OMRequest {
@@@ -232,13 -228,8 +233,14 @@@
  
    optional SetRangerServiceVersionRequest   SetRangerServiceVersionRequest = 
107;
    optional RangerBGSyncRequest              RangerBGSyncRequest            = 
109;
 +
    optional EchoRPCRequest                   EchoRPCRequest                 = 
110;
+   optional GetKeyInfoRequest                GetKeyInfoRequest              = 
111;
 +
-   optional CreateSnapshotRequest            CreateSnapshotRequest          = 
111;
-   optional ListSnapshotRequest              ListSnapshotRequest            = 
112;
-   optional SnapshotDiffRequest              snapshotDiffRequest            = 
113;
++  optional CreateSnapshotRequest            CreateSnapshotRequest          = 
112;
++  optional ListSnapshotRequest              ListSnapshotRequest            = 
113;
++  optional SnapshotDiffRequest              snapshotDiffRequest            = 
114;
 +
  }
  
  message OMResponse {
@@@ -336,12 -327,8 +338,13 @@@
  
    optional SetRangerServiceVersionResponse   SetRangerServiceVersionResponse 
= 107;
    optional RangerBGSyncResponse              RangerBGSyncResponse          = 
109;
 +
    optional EchoRPCResponse                   EchoRPCResponse               = 
110;
+   optional GetKeyInfoResponse                GetKeyInfoResponse            = 
111;
 +
-   optional CreateSnapshotResponse            CreateSnapshotResponse        = 
111;
-   optional ListSnapshotResponse              ListSnapshotResponse          = 
112;
-   optional SnapshotDiffResponse              snapshotDiffResponse          = 
113;
++  optional CreateSnapshotResponse            CreateSnapshotResponse        = 
112;
++  optional ListSnapshotResponse              ListSnapshotResponse          = 
113;
++  optional SnapshotDiffResponse              snapshotDiffResponse          = 
114;
  }
  
  enum Status {
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index 5fcf8b0839,ff5c95cccf..965413329c
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@@ -160,13 -107,116 +106,6 @@@ public class BucketManagerImpl implemen
  
    }
  
 -  @Override
 -  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acl);
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "BucketManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    String bucket = obj.getBucketName();
 -    boolean changed = false;
 -    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
 -    try {
 -      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
 -      OmBucketInfo bucketInfo =
 -          metadataManager.getBucketTable().get(dbBucketKey);
 -      if (bucketInfo == null) {
 -        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
 -        throw new OMException("Bucket " + bucket + " is not found",
 -            BUCKET_NOT_FOUND);
 -      }
--
-   /**
-    * Returns list of ACLs for given Ozone object.
-    *
-    * @param obj Ozone object.
-    * @throws IOException if there is error.
-    */
 -      changed = bucketInfo.addAcl(acl);
 -      if (changed) {
 -        metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
 -      }
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Add acl operation failed for bucket:{}/{} acl:{}",
 -            volume, bucket, acl, ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
 -    }
 -
 -    return changed;
 -  }
 -
 -  @Override
 -  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acl);
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "BucketManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    String bucket = obj.getBucketName();
 -    boolean removed = false;
 -    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
 -    try {
 -      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
 -      OmBucketInfo bucketInfo =
 -          metadataManager.getBucketTable().get(dbBucketKey);
 -      if (bucketInfo == null) {
 -        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
 -        throw new OMException("Bucket " + bucket + " is not found",
 -            BUCKET_NOT_FOUND);
 -      }
 -      removed = bucketInfo.removeAcl(acl);
 -      if (removed) {
 -        metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
 -      }
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Remove acl operation failed for bucket:{}/{} acl:{}",
 -            volume, bucket, acl, ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
 -    }
 -    return removed;
 -  }
 -
 -  @Override
 -  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException 
{
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acls);
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.BUCKET)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "BucketManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    String bucket = obj.getBucketName();
 -    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
 -    try {
 -      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
 -      OmBucketInfo bucketInfo =
 -          metadataManager.getBucketTable().get(dbBucketKey);
 -      if (bucketInfo == null) {
 -        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
 -        throw new OMException("Bucket " + bucket + " is not found",
 -            BUCKET_NOT_FOUND);
 -      }
 -      bucketInfo.setAcls(acls);
 -      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Set acl operation failed for bucket:{}/{} acl:{}",
 -            volume, bucket, StringUtils.join(",", acls), ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
 -    }
 -    return true;
 -  }
 -
    @Override
    public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
      Objects.requireNonNull(obj);
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index e30a3ba280,f500c9edd9..ad86a9ed76
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@@ -47,10 -46,11 +46,9 @@@ import org.apache.hadoop.fs.FileEncrypt
  import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
- import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
  import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
  import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+ import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
  import org.apache.hadoop.hdds.utils.BackgroundService;
  import org.apache.hadoop.hdds.utils.db.CodecRegistry;
  import org.apache.hadoop.hdds.utils.db.RDBStore;
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
index 42771f66f9,0000000000..9829251f50
mode 100644,000000..100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
@@@ -1,501 -1,0 +1,518 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the 
License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations 
under
 + * the License.
 + */
 +
 +package org.apache.hadoop.ozone.om;
 +
 +import java.io.IOException;
 +import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.server.OzoneAdmins;
 +import org.apache.hadoop.ipc.ProtobufRpcEngine;
 +import org.apache.hadoop.ipc.Server;
 +import org.apache.hadoop.ozone.OzoneAcl;
 +import org.apache.hadoop.ozone.OzoneConsts;
 +import org.apache.hadoop.ozone.audit.AuditAction;
 +import org.apache.hadoop.ozone.audit.AuditEventStatus;
 +import org.apache.hadoop.ozone.audit.AuditLogger;
 +import org.apache.hadoop.ozone.audit.AuditMessage;
 +import org.apache.hadoop.ozone.audit.Auditor;
 +import org.apache.hadoop.ozone.audit.OMAction;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 +import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 +import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 +import org.apache.hadoop.ozone.security.acl.RequestContext;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.hadoop.util.ReflectionUtils;
++import org.apache.hadoop.util.Time;
 +import org.slf4j.Logger;
 +import java.net.InetAddress;
 +import java.util.List;
 +import java.util.Map;
 +
 +import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 +import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
++import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE;
++import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
++import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE_MAX;
 +import static org.apache.hadoop.ozone.om.KeyManagerImpl.getRemoteUser;
 +import static org.apache.hadoop.ozone.om.OzoneManager.getS3Auth;
 +import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 +import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
++import static org.apache.hadoop.util.MetricUtil.captureLatencyNs;
 +
 +/**
 + * OM Metadata Reading class for the OM and Snapshot managers.
 + *
 + * This abstraction manages all the metadata key/acl reading
 + * from a rocksDb instance, for both the OM and OM snapshots.
 + */
 +public class OmMetadataReader implements IOmMetadataReader, Auditor {
 +  private final KeyManager keyManager;
 +  private final PrefixManager prefixManager;
 +  private final VolumeManager volumeManager;
 +  private final BucketManager bucketManager;
 +  private final OzoneManager ozoneManager;
 +  private final boolean isAclEnabled;
 +  private final IAccessAuthorizer accessAuthorizer;
 +  private final boolean isNativeAuthorizerEnabled;
 +  private final OmMetadataReaderMetrics metrics;
 +  private final Logger log;
 +  private final AuditLogger audit;
++  private OMPerformanceMetrics perfMetrics;
 +
 +  public OmMetadataReader(KeyManager keyManager,
-                    PrefixManager prefixManager,
-                    OzoneManager ozoneManager,
-                    Logger log,
-                    AuditLogger audit,
-                    OmMetadataReaderMetrics omMetadataReaderMetrics) {
++                          PrefixManager prefixManager,
++                          OzoneManager ozoneManager,
++                          Logger log,
++                          AuditLogger audit,
++                          OmMetadataReaderMetrics omMetadataReaderMetrics) {
 +    this.keyManager = keyManager;
 +    this.bucketManager = ozoneManager.getBucketManager();
 +    this.volumeManager = ozoneManager.getVolumeManager();
 +    this.prefixManager = prefixManager;
 +    OzoneConfiguration configuration = ozoneManager.getConfiguration();
 +    this.ozoneManager = ozoneManager;
 +    this.isAclEnabled = ozoneManager.getAclsEnabled();
 +    this.log = log;
 +    this.audit = audit;
 +    boolean allowListAllVolumes = ozoneManager.getAllowListAllVolumes();
-     metrics = omMetadataReaderMetrics;
++    this.metrics = omMetadataReaderMetrics;
++    this.perfMetrics = ozoneManager.getPerfMetrics();
 +    if (isAclEnabled) {
 +      accessAuthorizer = getACLAuthorizerInstance(configuration);
 +      if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
 +        OzoneNativeAuthorizer authorizer =
 +            (OzoneNativeAuthorizer) accessAuthorizer;
 +        isNativeAuthorizerEnabled = true;
 +        authorizer.setVolumeManager(volumeManager);
 +        authorizer.setBucketManager(bucketManager);
 +        authorizer.setKeyManager(keyManager);
 +        authorizer.setPrefixManager(prefixManager);
 +        authorizer.setOzoneAdmins(
 +            new OzoneAdmins(ozoneManager.getOmAdminUsernames()));
 +        authorizer.setAllowListAllVolumes(allowListAllVolumes);
 +      } else {
 +        isNativeAuthorizerEnabled = false;
 +      }
 +    } else {
 +      accessAuthorizer = null;
 +      isNativeAuthorizerEnabled = false;
 +    }
 +  }
 +
 +  /**
 +   * Lookup a key.
 +   *
 +   * @param args - attributes of the key.
 +   * @return OmKeyInfo - the info about the requested key.
 +   * @throws IOException
 +   */
 +  @Override
 +  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
-     ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
- 
-     if (isAclEnabled) {
-       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-           bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-     }
- 
++    long start = Time.monotonicNowNanos();
++    ResolvedBucket bucket = captureLatencyNs(
++        perfMetrics.getLookupResolveBucketLatencyNs(),
++        () -> ozoneManager.resolveBucketLink(args));
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
-     args = bucket.update(args);
++    OmKeyArgs resolvedArgs = bucket.update(args);
 +
 +    try {
++      if (isAclEnabled) {
++        captureLatencyNs(perfMetrics.getLookupAclCheckLatencyNs(),
++            () -> checkAcls(ResourceType.KEY, StoreType.OZONE,
++                ACLType.READ, bucket.realVolume(), bucket.realBucket(),
++                args.getKeyName())
++        );
++      }
 +      metrics.incNumKeyLookups();
-       return keyManager.lookupKey(args, getClientAddress());
++      return keyManager.lookupKey(resolvedArgs, getClientAddress());
 +    } catch (Exception ex) {
 +      metrics.incNumKeyLookupFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
 +            auditMap));
 +      }
++
++      perfMetrics.addLookupLatency(Time.monotonicNowNanos() - start);
 +    }
 +  }
 +
 +  @Override
 +  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
 +      String startKey, long numEntries, boolean allowPartialPrefixes)
 +      throws IOException {
 +
-     ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
++    long maxListingPageSize = ozoneManager.getConfiguration().getInt(
++        OZONE_FS_LISTING_PAGE_SIZE_MAX,
++        OZONE_FS_LISTING_PAGE_SIZE_DEFAULT);
++    maxListingPageSize = OzoneConfigUtil.limitValue(numEntries,
++        OZONE_FS_LISTING_PAGE_SIZE, OZONE_FS_LISTING_PAGE_SIZE_MAX,
++        maxListingPageSize);
 +
-     if (isAclEnabled) {
-       checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
-           bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-     }
++    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
++      if (isAclEnabled) {
++        checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
++            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
++      }
 +      metrics.incNumListStatus();
-       return keyManager.listStatus(args, recursive, startKey, numEntries,
-               getClientAddress(), allowPartialPrefixes);
++      return keyManager.listStatus(args, recursive, startKey,
++          maxListingPageSize, getClientAddress(), allowPartialPrefixes);
 +    } catch (Exception ex) {
 +      metrics.incNumListStatusFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(
 +            OMAction.LIST_STATUS, auditMap));
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
 +      metrics.incNumGetFileStatus();
 +      return keyManager.getFileStatus(args, getClientAddress());
 +    } catch (IOException ex) {
 +      metrics.incNumGetFileStatusFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(
 +          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS, auditMap, 
ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(
 +            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS, auditMap));
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
-     if (isAclEnabled) {
-       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-           bucket.realVolume(), bucket.realBucket(), args.getKeyName());
-     }
- 
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
++      if (isAclEnabled) {
++        checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
++            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
++      }
 +      metrics.incNumLookupFile();
 +      return keyManager.lookupFile(args, getClientAddress());
 +    } catch (Exception ex) {
 +      metrics.incNumLookupFileFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(
 +            OMAction.LOOKUP_FILE, auditMap));
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
 +      String startKey, String keyPrefix, int maxKeys) throws IOException {
 +
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(
 +        Pair.of(volumeName, bucketName));
 +
-     if (isAclEnabled) {
-       checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
-           bucket.realVolume(), bucket.realBucket(), keyPrefix);
-     }
- 
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit();
 +    auditMap.put(OzoneConsts.START_KEY, startKey);
 +    auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
 +    auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
 +
 +    try {
++      if (isAclEnabled) {
++        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
++            bucket.realVolume(), bucket.realBucket(), keyPrefix);
++      }
 +      metrics.incNumKeyLists();
 +      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
 +          startKey, keyPrefix, maxKeys);
 +    } catch (IOException ex) {
 +      metrics.incNumKeyListFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_KEYS,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.LIST_KEYS,
 +            auditMap));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Returns list of ACLs for given Ozone object.
 +   *
 +   * @param obj Ozone object.
 +   * @throws IOException if there is error.
 +   */
 +  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
 +    boolean auditSuccess = true;
 +
 +    try {
 +      if (isAclEnabled) {
 +        checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.READ_ACL,
 +            obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
 +      }
 +      metrics.incNumGetAcl();
 +      switch (obj.getResourceType()) {
 +      case VOLUME:
 +        return volumeManager.getAcl(obj);
 +      case BUCKET:
 +        return bucketManager.getAcl(obj);
 +      case KEY:
 +        return keyManager.getAcl(obj);
 +      case PREFIX:
 +        return prefixManager.getAcl(obj);
 +
 +      default:
 +        throw new OMException("Unexpected resource type: " +
 +            obj.getResourceType(), INVALID_REQUEST);
 +      }
 +    } catch (Exception ex) {
 +      auditSuccess = false;
 +      audit.logReadFailure(
 +          buildAuditMessageForFailure(OMAction.GET_ACL, obj.toAuditMap(), 
ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(
 +            buildAuditMessageForSuccess(OMAction.GET_ACL, obj.toAuditMap()));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Checks if current caller has acl permissions.
 +   *
 +   * @param resType - Type of ozone resource. Ex volume, bucket.
 +   * @param store   - Store type. i.e Ozone, S3.
 +   * @param acl     - type of access to be checked.
 +   * @param vol     - name of volume
 +   * @param bucket  - bucket name
 +   * @param key     - key
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
 +   */
 +  void checkAcls(ResourceType resType, StoreType store,
 +      ACLType acl, String vol, String bucket, String key)
 +      throws IOException {
 +    UserGroupInformation user;
 +    if (getS3Auth() != null) {
 +      String principal =
 +          OzoneAclUtils.accessIdToUserPrincipal(getS3Auth().getAccessId());
 +      user = UserGroupInformation.createRemoteUser(principal);
 +    } else {
 +      user = ProtobufRpcEngine.Server.getRemoteUser();
 +    }
 +
 +    InetAddress remoteIp = ProtobufRpcEngine.Server.getRemoteIp();
 +    String volumeOwner = ozoneManager.getVolumeOwner(vol, acl, resType);
 +    String bucketOwner = ozoneManager.getBucketOwner(vol, bucket, acl, 
resType);
 +
 +    OzoneAclUtils.checkAllAcls(this, resType, store, acl,
 +        vol, bucket, key, volumeOwner, bucketOwner,
 +        user != null ? user : getRemoteUser(),
 +        remoteIp != null ? remoteIp :
 +            ozoneManager.getOmRpcServerAddr().getAddress(),
 +        remoteIp != null ? remoteIp.getHostName() :
 +            ozoneManager.getOmRpcServerAddr().getHostName());
 +  }
 +
 +  
 +  /**
 +   * CheckAcls for the ozone object.
 +   *
 +   * @return true if permission granted, false if permission denied.
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
 +   *                     and throwOnPermissionDenied set to true.
 +   */
 +  @SuppressWarnings("parameternumber")
 +  public boolean checkAcls(ResourceType resType, StoreType storeType,
 +      ACLType aclType, String vol, String bucket, String key,
 +      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
 +      boolean throwIfPermissionDenied, String owner)
 +      throws OMException {
 +    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
 +        .setResType(resType)
 +        .setStoreType(storeType)
 +        .setVolumeName(vol)
 +        .setBucketName(bucket)
 +        .setKeyName(key).build();
 +    RequestContext context = RequestContext.newBuilder()
 +        .setClientUgi(ugi)
 +        .setIp(remoteAddress)
 +        .setHost(hostName)
 +        .setAclType(ACLIdentityType.USER)
 +        .setAclRights(aclType)
 +        .setOwnerName(owner)
 +        .build();
 +
 +    return checkAcls(obj, context, throwIfPermissionDenied);
 +  }
 +
 +  /**
 +   * CheckAcls for the ozone object.
 +   *
 +   * @return true if permission granted, false if permission denied.
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
 +   *                     and throwOnPermissionDenied set to true.
 +   */
 +  public boolean checkAcls(OzoneObj obj, RequestContext context,
 +                           boolean throwIfPermissionDenied)
 +      throws OMException {
 +
 +    if (!accessAuthorizer.checkAccess(obj, context)) {
 +      if (throwIfPermissionDenied) {
 +        String volumeName = obj.getVolumeName() != null ?
 +                "Volume:" + obj.getVolumeName() + " " : "";
 +        String bucketName = obj.getBucketName() != null ?
 +                "Bucket:" + obj.getBucketName() + " " : "";
 +        String keyName = obj.getKeyName() != null ?
 +                "Key:" + obj.getKeyName() : "";
 +        log.warn("User {} doesn't have {} permission to access {} {}{}{}",
 +            context.getClientUgi().getUserName(), context.getAclRights(),
 +            obj.getResourceType(), volumeName, bucketName, keyName);
 +        throw new OMException("User " + context.getClientUgi().getUserName() +
 +            " doesn't have " + context.getAclRights() +
 +            " permission to access " + obj.getResourceType() + " " +
 +            volumeName  + bucketName + keyName, 
ResultCodes.PERMISSION_DENIED);
 +      }
 +      return false;
 +    } else {
 +      return true;
 +    }
 +  }
 +
 +  /**
 +   * Returns an instance of {@link IAccessAuthorizer}.
 +   * Looks up the configuration to see if there is custom class specified.
 +   * Constructs the instance by passing the configuration directly to the
 +   * constructor to achieve thread safety using final fields.
 +   *
 +   * @param conf
 +   * @return IAccessAuthorizer
 +   */
 +  private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) 
{
 +    Class<? extends IAccessAuthorizer> clazz = conf.getClass(
 +        OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizer.class,
 +        IAccessAuthorizer.class);
 +    return ReflectionUtils.newInstance(clazz, conf);
 +  }
 +
-   private static String getClientAddress() {
++  static String getClientAddress() {
 +    String clientMachine = Server.getRemoteAddress();
 +    if (clientMachine == null) { //not a RPC client
 +      clientMachine = "";
 +    }
 +    return clientMachine;
 +  }
 +
 +  @Override
 +  public AuditMessage buildAuditMessageForSuccess(AuditAction op,
 +      Map<String, String> auditMap) {
 +
 +    return new AuditMessage.Builder()
 +        .setUser(getRemoteUserName())
 +        .atIp(Server.getRemoteAddress())
 +        .forOperation(op)
 +        .withParams(auditMap)
 +        .withResult(AuditEventStatus.SUCCESS)
 +        .build();
 +  }
 +
 +  @Override
 +  public AuditMessage buildAuditMessageForFailure(AuditAction op,
 +      Map<String, String> auditMap, Throwable throwable) {
 +
 +    return new AuditMessage.Builder()
 +        .setUser(getRemoteUserName())
 +        .atIp(Server.getRemoteAddress())
 +        .forOperation(op)
 +        .withParams(auditMap)
 +        .withResult(AuditEventStatus.FAILURE)
 +        .withException(throwable)
 +        .build();
 +  }
 +
 +  /**
 +   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
 +   *
 +   * @return if native authorizer is enabled.
 +   */
 +  public boolean isNativeAuthorizerEnabled() {
 +    return isNativeAuthorizerEnabled;
 +  }
 +
 +  public IAccessAuthorizer getAccessAuthorizer() {
 +    return accessAuthorizer;
 +  }
 +
 +  private ResourceType getResourceType(OmKeyArgs args) {
 +    if (args.getKeyName() == null || args.getKeyName().length() == 0) {
 +      return ResourceType.BUCKET;
 +    }
 +    return ResourceType.KEY;
 +  }
 +
 +  
 +}
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 3ec7fb8c55,8d2553f1ed..e0016ec257
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@@ -74,10 -70,8 +70,11 @@@ import org.apache.hadoop.hdds.utils.db.
  import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
  import org.apache.hadoop.hdds.utils.db.TableIterator;
  import org.apache.hadoop.ozone.OzoneManagerVersion;
+ import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
 +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
- import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 +import org.apache.hadoop.ozone.om.request.OMClientRequest;
+ import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
 +import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
  import org.apache.hadoop.ozone.util.OzoneNetUtils;
  import org.apache.hadoop.ozone.om.helpers.BucketLayout;
  import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
@@@ -212,7 -211,7 +209,6 @@@ import static org.apache.hadoop.fs.Comm
  import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
  import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
  import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
- import static 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 -import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
  import static 
org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
  import static org.apache.hadoop.hdds.utils.HAUtils.getScmInfo;
  import static org.apache.hadoop.ozone.OmUtils.MAX_TRXN_ID;
@@@ -2539,12 -2507,10 +2433,11 @@@ public final class OzoneManager extend
      boolean auditSuccess = true;
      Map<String, String> auditMap = buildAuditMap(volume);
      try {
+       if (isAclEnabled) {
 -        checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.READ, volume,
 -                null, null);
++        omMetadataReader.checkAcls(ResourceType.VOLUME,
++            StoreType.OZONE, ACLType.READ, volume,
++            null, null);
+       }
        metrics.incNumVolumeInfos();
        return volumeManager.getVolumeInfo(volume);
      } catch (Exception ex) {
@@@ -2593,6 -2559,12 +2486,13 @@@
      try {
        metrics.incNumVolumeLists();
        if (isAclEnabled) {
+         String remoteUserName = remoteUserUgi.getShortUserName();
+         // if not admin nor list my own volumes, check ACL.
+         if (!remoteUserName.equals(userName) && !isAdmin(remoteUserUgi)) {
 -          checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
 -                  OzoneConsts.OZONE_ROOT, null, null);
++          omMetadataReader.checkAcls(ResourceType.VOLUME,
++              StoreType.OZONE, ACLType.LIST,
++              OzoneConsts.OZONE_ROOT, null, null);
+         }
          // List all volumes first
          List<OmVolumeArgs> listAllVolumes = volumeManager.listVolumes(
              null, prefix, prevKey, maxKeys);
@@@ -2686,6 -2649,10 +2581,11 @@@
      auditMap.put(OzoneConsts.MAX_NUM_OF_BUCKETS,
          String.valueOf(maxNumOfBuckets));
      try {
+       if (isAclEnabled) {
 -        checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
 -                volumeName, null, null);
++        omMetadataReader.checkAcls(ResourceType.VOLUME,
++            StoreType.OZONE, ACLType.LIST,
++            volumeName, null, null);
+       }
        metrics.incNumBucketLists();
        return bucketManager.listBuckets(volumeName,
            startKey, prefix, maxNumOfBuckets);
@@@ -2723,6 -2685,10 +2618,11 @@@
      Map<String, String> auditMap = buildAuditMap(volume);
      auditMap.put(OzoneConsts.BUCKET, bucket);
      try {
+       if (isAclEnabled) {
 -        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ, volume,
 -                bucket, null);
++        omMetadataReader.checkAcls(ResourceType.BUCKET,
++            StoreType.OZONE, ACLType.READ, volume,
++            bucket, null);
+       }
        metrics.incNumBucketInfos();
        final OmBucketInfo bucketInfo =
            bucketManager.getBucketInfo(volume, bucket);
@@@ -2750,14 -2716,135 +2650,78 @@@
     */
    @Override
    public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
 -    long start = Time.monotonicNowNanos();
 -    ResolvedBucket bucket = captureLatencyNs(
 -        perfMetrics.getLookupResolveBucketLatencyNs(),
 -        () -> resolveBucketLink(args));
 -    boolean auditSuccess = true;
 -    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 -
 -    OmKeyArgs resolvedArgs = bucket.update(args);
 -
 -    try {
 -      if (isAclEnabled) {
 -        captureLatencyNs(perfMetrics.getLookupAclCheckLatencyNs(),
 -                () -> checkAcls(ResourceType.KEY, StoreType.OZONE,
 -                        ACLType.READ, bucket.realVolume(), 
bucket.realBucket(),
 -                        args.getKeyName())
 -        );
 -      }
 -      metrics.incNumKeyLookups();
 -      return keyManager.lookupKey(resolvedArgs, getClientAddress());
 -    } catch (Exception ex) {
 -      metrics.incNumKeyLookupFails();
 -      auditSuccess = false;
 -      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
 -          auditMap, ex));
 -      throw ex;
 -    } finally {
 -      if (auditSuccess) {
 -        AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
 -            auditMap));
 -      }
 -
 -      perfMetrics.addLookupLatency(Time.monotonicNowNanos() - start);
 -    }
 +    return getReader(args).lookupKey(args);
    }
  
+   @Override
+   public KeyInfoWithVolumeContext getKeyInfo(final OmKeyArgs args,
+                                              boolean assumeS3Context)
+       throws IOException {
+     long start = Time.monotonicNowNanos();
+ 
+     java.util.Optional<S3VolumeContext> s3VolumeContext =
+         java.util.Optional.empty();
+ 
+     final OmKeyArgs resolvedVolumeArgs;
+     if (assumeS3Context) {
+       S3VolumeContext context = getS3VolumeContext();
+       s3VolumeContext = java.util.Optional.of(context);
+       resolvedVolumeArgs = args.toBuilder()
+           .setVolumeName(context.getOmVolumeArgs().getVolume())
+           .build();
+     } else {
+       resolvedVolumeArgs = args;
+     }
+ 
+     final ResolvedBucket bucket = captureLatencyNs(
+         perfMetrics.getGetKeyInfoResolveBucketLatencyNs(),
+         () -> resolveBucketLink(resolvedVolumeArgs));
+ 
+     boolean auditSuccess = true;
+     OmKeyArgs resolvedArgs = bucket.update(args);
+ 
+     try {
+       if (isAclEnabled) {
+         captureLatencyNs(perfMetrics.getGetKeyInfoAclCheckLatencyNs(), () ->
 -                checkAcls(ResourceType.KEY, StoreType.OZONE,
 -                        ACLType.READ, bucket.realVolume(),
 -                        bucket.realBucket(), args.getKeyName())
++            omMetadataReader.checkAcls(ResourceType.KEY,
++                StoreType.OZONE, ACLType.READ,
++                bucket.realVolume(), bucket.realBucket(), args.getKeyName())
+         );
+       }
+ 
+       metrics.incNumGetKeyInfo();
+       OmKeyInfo keyInfo =
 -          keyManager.getKeyInfo(resolvedArgs, getClientAddress());
++          keyManager.getKeyInfo(resolvedArgs,
++              OmMetadataReader.getClientAddress());
+       KeyInfoWithVolumeContext.Builder builder = KeyInfoWithVolumeContext
+           .newBuilder()
+           .setKeyInfo(keyInfo);
+       s3VolumeContext.ifPresent(context -> {
+         builder.setVolumeArgs(context.getOmVolumeArgs());
+         builder.setUserPrincipal(context.getUserPrincipal());
+       });
+       return builder.build();
+     } catch (Exception ex) {
+       metrics.incNumGetKeyInfoFails();
+       auditSuccess = false;
+       AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
+           bucket.audit(resolvedVolumeArgs.toAuditMap()), ex));
+       throw ex;
+     } finally {
+       if (auditSuccess) {
+         AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
+             bucket.audit(resolvedVolumeArgs.toAuditMap())));
+       }
+       perfMetrics.addGetKeyInfoLatencyNs(Time.monotonicNowNanos() - start);
+     }
+   }
+ 
    @Override
    public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
        String startKey, String keyPrefix, int maxKeys) throws IOException {
+ 
 -    ResolvedBucket bucket = resolveBucketLink(Pair.of(volumeName, 
bucketName));
 -
 -    boolean auditSuccess = true;
 -    Map<String, String> auditMap = bucket.audit();
 -    auditMap.put(OzoneConsts.START_KEY, startKey);
 -    auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
 -    auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
 -
 -    try {
 -      if (isAclEnabled) {
 -        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
 -                bucket.realVolume(), bucket.realBucket(), keyPrefix);
 -      }
 -      metrics.incNumKeyLists();
 -      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
 -          startKey, keyPrefix, maxKeys);
 -    } catch (IOException ex) {
 -      metrics.incNumKeyListFails();
 -      auditSuccess = false;
 -      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_KEYS,
 -          auditMap, ex));
 -      throw ex;
 -    } finally {
 -      if (auditSuccess) {
 -        AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.LIST_KEYS,
 -            auditMap));
 -      }
 -    }
 +    return getReader(volumeName, bucketName, keyPrefix).listKeys(
-         volumeName, bucketName, startKey, keyPrefix, maxKeys);
++            volumeName, bucketName, startKey, keyPrefix, maxKeys);
    }
  
    @Override
@@@ -2779,8 -2857,11 +2734,12 @@@
      auditMap.put(OzoneConsts.START_KEY, startKeyName);
      auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
      auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
- 
      try {
+       if (isAclEnabled) {
 -        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
 -                volumeName, bucketName, keyPrefix);
++        omMetadataReader.checkAcls(ResourceType.BUCKET,
++            StoreType.OZONE, ACLType.LIST,
++            volumeName, bucketName, keyPrefix);
+       }
        metrics.incNumTrashKeyLists();
        return keyManager.listTrash(volumeName, bucketName,
            startKeyName, keyPrefix, maxKeys);
@@@ -3385,12 -3514,49 +3344,18 @@@
    }
  
    @Override
+   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
 -      String startKey, long numEntries)
 -      throws IOException {
++                                          String startKey, long numEntries)
++          throws IOException {
+     return listStatus(args, recursive, startKey, numEntries, false);
+   }
+ 
    public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
        String startKey, long numEntries, boolean allowPartialPrefixes)
        throws IOException {
 -    long maxListingPageSize = configuration.getInt(
 -        OZONE_FS_LISTING_PAGE_SIZE_MAX,
 -        OZONE_FS_LISTING_PAGE_SIZE_DEFAULT);
 -    maxListingPageSize = OzoneConfigUtil.limitValue(numEntries,
 -        OZONE_FS_LISTING_PAGE_SIZE, OZONE_FS_LISTING_PAGE_SIZE_MAX,
 -        maxListingPageSize);
  
 -    ResolvedBucket bucket = resolveBucketLink(args);
 -
 -    boolean auditSuccess = true;
 -    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 -
 -    args = bucket.update(args);
 -
 -    try {
 -      if (isAclEnabled) {
 -        checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
 -                bucket.realVolume(), bucket.realBucket(), args.getKeyName());
 -      }
 -      metrics.incNumListStatus();
 -      return keyManager.listStatus(args, recursive, startKey,
 -          maxListingPageSize, getClientAddress(), allowPartialPrefixes);
 -    } catch (Exception ex) {
 -      metrics.incNumListStatusFails();
 -      auditSuccess = false;
 -      AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
 -          auditMap, ex));
 -      throw ex;
 -    } finally {
 -      if (auditSuccess) {
 -        AUDIT.logReadSuccess(buildAuditMessageForSuccess(
 -            OMAction.LIST_STATUS, auditMap));
 -      }
 -    }
 +    return getReader(args).listStatus(args, recursive,
-         startKey, numEntries, allowPartialPrefixes);
++            startKey, numEntries, allowPartialPrefixes);
    }
  
    /**
@@@ -4258,26 -4465,64 +4223,87 @@@
    private BucketLayout getBucketLayout() {
      return BucketLayout.DEFAULT;
    }
+ 
+   void saveNewCertId(String certId) {
+     try {
+       omStorage.setOmCertSerialId(certId);
+       omStorage.persistCurrentState();
+     } catch (IOException ex) {
+       // New cert ID cannot be persisted into VERSION file.
+       LOG.error("Failed to persist new cert ID {} to VERSION file." +
+           "Terminating OzoneManager...", certId, ex);
+       shutDown("OzoneManage shutdown because VERSION file persist failure.");
+     }
+   }
+ 
+   public static HddsProtos.OzoneManagerDetailsProto getOmDetailsProto(
+       OzoneConfiguration config, String omID) {
+     boolean flexibleFqdnResolutionEnabled = config.getBoolean(
+         OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED,
+         OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED_DEFAULT);
+     InetSocketAddress omRpcAdd = OmUtils.getOmAddress(config);
+     String ip = null;
+ 
+     boolean addressResolved = omRpcAdd != null && omRpcAdd.getAddress() != 
null;
+     if (flexibleFqdnResolutionEnabled && !addressResolved && omRpcAdd != 
null) {
+       InetSocketAddress omRpcAddWithHostName =
+           OzoneNetUtils.getAddressWithHostNameLocal(omRpcAdd);
+       if (omRpcAddWithHostName != null
+           && omRpcAddWithHostName.getAddress() != null) {
+         addressResolved = true;
+         ip = omRpcAddWithHostName.getAddress().getHostAddress();
+       }
+     }
+ 
+     if (!addressResolved) {
+       LOG.error("Incorrect om rpc address. omRpcAdd:{}", omRpcAdd);
+       throw new RuntimeException("Can't get SCM signed certificate. " +
+           "omRpcAdd: " + omRpcAdd);
+     }
+ 
+     if (ip == null) {
+       ip = omRpcAdd.getAddress().getHostAddress();
+     }
+ 
+     String hostname = omRpcAdd.getHostName();
+     int port = omRpcAdd.getPort();
+ 
+     HddsProtos.OzoneManagerDetailsProto.Builder omDetailsProtoBuilder =
+         HddsProtos.OzoneManagerDetailsProto.newBuilder()
+             .setHostName(hostname)
+             .setIpAddress(ip)
+             .setUuid(omID)
+             .addPorts(HddsProtos.Port.newBuilder()
+                 .setName(RPC_PORT)
+                 .setValue(port)
+                 .build());
+ 
+     HddsProtos.OzoneManagerDetailsProto omDetailsProto =
+         omDetailsProtoBuilder.build();
+     LOG.info("OzoneManager ports added:{}", omDetailsProto.getPortsList());
+     return omDetailsProto;
+   }
++
 +  private IOmMetadataReader getReader(OmKeyArgs keyArgs) throws IOException {
 +    return omSnapshotManager.checkForSnapshot(
 +        keyArgs.getVolumeName(), keyArgs.getBucketName(), 
keyArgs.getKeyName());
 +  }
 +
 +  private IOmMetadataReader getReader(String volumeName, String bucketName,
 +      String key) throws IOException {
 +    return omSnapshotManager.checkForSnapshot(volumeName, bucketName, key);
 +  }
 +
 +  private IOmMetadataReader getReader(OzoneObj ozoneObj) throws IOException {
 +    return omSnapshotManager.checkForSnapshot(
 +        ozoneObj.getVolumeName(), ozoneObj.getBucketName(),
 +        ozoneObj.getKeyName());
 +  }
 +
 +  public SnapshotDiffReport snapshotDiff(String volume, String bucket,
 +                                         String fromSnapshot, String 
toSnapshot)
 +      throws IOException {
 +    return omSnapshotManager.getSnapshotDiffReport(volume, bucket,
 +        fromSnapshot, toSnapshot);
 +  }
  }
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index 11e261f41c,7d8c8fe517..bcd099e961
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@@ -100,12 -84,117 +84,6 @@@ public class VolumeManagerImpl implemen
      }
    }
  
-   /**
-    * Returns list of ACLs for given Ozone object.
-    *
-    * @param obj Ozone object.
-    * @throws IOException if there is error.
-    */
 -  @Override
 -  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acl);
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "VolumeManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
 -    try {
 -      String dbVolumeKey = metadataManager.getVolumeKey(volume);
 -      OmVolumeArgs volumeArgs =
 -          metadataManager.getVolumeTable().get(dbVolumeKey);
 -      if (volumeArgs == null) {
 -        LOG.debug("volume:{} does not exist", volume);
 -        throw new OMException("Volume " + volume + " is not found",
 -            ResultCodes.VOLUME_NOT_FOUND);
 -      }
 -      if (volumeArgs.addAcl(acl)) {
 -        metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
 -        return true;
 -      }
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Add acl operation failed for volume:{} acl:{}",
 -            volume, acl, ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
 -    }
 -
 -    return false;
 -  }
 -
 -  @Override
 -  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acl);
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "VolumeManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
 -    try {
 -      String dbVolumeKey = metadataManager.getVolumeKey(volume);
 -      OmVolumeArgs volumeArgs =
 -          metadataManager.getVolumeTable().get(dbVolumeKey);
 -      if (volumeArgs == null) {
 -        LOG.debug("volume:{} does not exist", volume);
 -        throw new OMException("Volume " + volume + " is not found",
 -            ResultCodes.VOLUME_NOT_FOUND);
 -      }
 -      if (volumeArgs.removeAcl(acl)) {
 -        metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
 -        return true;
 -      }
 -
 -      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Remove acl operation failed for volume:{} acl:{}",
 -            volume, acl, ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
 -    }
 -
 -    return false;
 -  }
 -
 -  @Override
 -  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException 
{
 -    Objects.requireNonNull(obj);
 -    Objects.requireNonNull(acls);
 -
 -    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
 -      throw new IllegalArgumentException("Unexpected argument passed to " +
 -          "VolumeManager. OzoneObj type:" + obj.getResourceType());
 -    }
 -    String volume = obj.getVolumeName();
 -    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
 -    try {
 -      String dbVolumeKey = metadataManager.getVolumeKey(volume);
 -      OmVolumeArgs volumeArgs =
 -          metadataManager.getVolumeTable().get(dbVolumeKey);
 -      if (volumeArgs == null) {
 -        LOG.debug("volume:{} does not exist", volume);
 -        throw new OMException("Volume " + volume + " is not found",
 -            ResultCodes.VOLUME_NOT_FOUND);
 -      }
 -      volumeArgs.setAcls(acls);
 -      metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
 -
 -      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
 -    } catch (IOException ex) {
 -      if (!(ex instanceof OMException)) {
 -        LOG.error("Set acl operation failed for volume:{} acls:{}",
 -            volume, acls, ex);
 -      }
 -      throw ex;
 -    } finally {
 -      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
 -    }
 -
 -    return true;
 -  }
 -
    @Override
    public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
      Objects.requireNonNull(obj);
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 932196b54c,9f31399230..6ab84a5f02
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@@ -280,16 -282,14 +285,24 @@@ public class OzoneManagerRequestHandle
              request.getTenantListUserRequest());
          responseBuilder.setTenantListUserResponse(listUserResponse);
          break;
+       case GetKeyInfo:
+         responseBuilder.setGetKeyInfoResponse(
+             getKeyInfo(request.getGetKeyInfoRequest(), request.getVersion()));
+         break;
 +      case ListSnapshot:
 +        OzoneManagerProtocolProtos.ListSnapshotResponse listSnapshotResponse =
 +            getSnapshots(request.getListSnapshotRequest());
 +        responseBuilder.setListSnapshotResponse(listSnapshotResponse);
 +        break;
 +      case SnapshotDiff:
 +        SnapshotDiffResponse snapshotDiffReport = snapshotDiff(
 +            request.getSnapshotDiffRequest());
 +        responseBuilder.setSnapshotDiffResponse(snapshotDiffReport);
 +        break;
+       case EchoRPC:
+         EchoRPCResponse echoRPCResponse =
 -                echoRPC(request.getEchoRPCRequest());
++            echoRPC(request.getEchoRPCRequest());
+         responseBuilder.setEchoRPCResponse(echoRPCResponse);
        default:
          responseBuilder.setSuccess(false);
          responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@@ -1219,15 -1201,19 +1224,30 @@@
      return impl;
    }
  
+   private EchoRPCResponse echoRPC(EchoRPCRequest req) {
 -    EchoRPCResponse.Builder builder = 
++    EchoRPCResponse.Builder builder =
+             EchoRPCResponse.newBuilder();
+ 
 -    byte[] payloadBytes = new byte[0];    
++    byte[] payloadBytes = new byte[0];
+     int payloadRespSize = Math.min(
+             req.getPayloadSizeResp()
+                     * RPC_PAYLOAD_MULTIPLICATION_FACTOR, MAX_SIZE_KB);
+     if (payloadRespSize > 0) {
+       payloadBytes = RandomUtils.nextBytes(payloadRespSize);
+     }
+     builder.setPayload(ByteString.copyFrom(payloadBytes));
+     return builder.build();
+   }
+ 
 +  private OzoneManagerProtocolProtos.ListSnapshotResponse getSnapshots(
 +      OzoneManagerProtocolProtos.ListSnapshotRequest request)
 +      throws IOException {
 +    List<SnapshotInfo> snapshotInfos = impl.listSnapshot(
 +        request.getVolumeName(), request.getBucketName());
 +    List<OzoneManagerProtocolProtos.SnapshotInfo> snapshotInfoList =
 +        snapshotInfos.stream().map(SnapshotInfo::getProtobuf)
 +            .collect(Collectors.toList());
 +    return OzoneManagerProtocolProtos.ListSnapshotResponse.newBuilder()
 +        .addAllSnapshotInfo(snapshotInfoList).build();
 +  }
  }
diff --cc 
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
index 35dc07a9ad,0000000000..acc2803c50
mode 100644,000000..100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@@ -1,237 -1,0 +1,294 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.hadoop.ozone.om;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +import org.apache.commons.lang3.RandomStringUtils;
++import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
++import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
++import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 +import org.apache.hadoop.hdds.server.ServerUtils;
 +import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 +import org.apache.hadoop.hdds.utils.db.DBProfile;
 +import org.apache.hadoop.hdds.utils.db.RDBStore;
 +import org.apache.hadoop.ozone.OzoneConsts;
++import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
++import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
++import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
++import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
++import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 +import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.apache.ratis.util.ExitUtils;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.rules.TemporaryFolder;
 +import org.rocksdb.LiveFileMetaData;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.nio.file.Paths;
 +import java.util.ArrayList;
++import java.util.Collections;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
 +import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 +
 +/**
 + * Test SST Filtering Service.
 + */
 +public class TestSstFilteringService {
 +  @Rule
 +  public TemporaryFolder folder = new TemporaryFolder();
 +  private OzoneManagerProtocol writeClient;
 +  private OzoneManager om;
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(TestSstFilteringService.class);
 +
 +  @BeforeClass
 +  public static void setup() {
 +    ExitUtils.disableSystemExit();
 +  }
 +
 +  private OzoneConfiguration createConfAndInitValues() throws IOException {
 +    OzoneConfiguration conf = new OzoneConfiguration();
 +    File newFolder = folder.newFolder();
 +    if (!newFolder.exists()) {
 +      Assert.assertTrue(newFolder.mkdirs());
 +    }
 +    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
 +    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
 +    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
 +        TimeUnit.MILLISECONDS);
 +    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
 +        TimeUnit.MILLISECONDS);
 +    conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
 +    conf.setQuietMode(false);
 +
 +    return conf;
 +  }
 +
 +  @After
 +  public void cleanup() throws Exception {
 +    om.stop();
 +  }
 +
 +  /**
 +   * Test checks whether for existing snapshots
 +   * the checkpoint should not have any sst files that do not correspond to
 +   * the bucket on which create snapshot command was issued.
 +   *
 +   * The SSTFiltering service deletes only the last level of
 +   * sst file (rocksdb behaviour).
 +   *
 +   * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
 +   * 2. compact the db (new level SSTS will be created for vol1/buck1)
 +   * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
 +   * 4. Take snapshot on vol1/buck2.
 +   * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
 +   *    Wait till the BG service deletes these.
 +   *
 +   * @throws IOException - on Failure.
 +   */
 +
 +  @Test
 +  public void testIrrelevantSstFileDeletion()
 +      throws IOException, TimeoutException, InterruptedException,
 +      AuthenticationException {
 +    OzoneConfiguration conf = createConfAndInitValues();
 +    OmTestManagers omTestManagers = new OmTestManagers(conf);
 +    KeyManager keyManager = omTestManagers.getKeyManager();
 +    writeClient = omTestManagers.getWriteClient();
 +    om = omTestManagers.getOzoneManager();
 +    RDBStore store = (RDBStore) om.getMetadataManager().getStore();
 +
 +    final int keyCount = 100;
 +    createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
 +    SstFilteringService sstFilteringService =
 +        (SstFilteringService) keyManager.getSnapshotSstFilteringService();
 +
 +    String rocksDbDir = om.getRocksDbDirectory();
 +
 +    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
 +
 +    createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1);
 +    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
 +
 +    int level0FilesCount = 0;
 +    int totalFileCount = 0;
 +
 +    List<LiveFileMetaData> initialsstFileList = 
store.getDb().getSstFileList();
 +    for (LiveFileMetaData fileMetaData : initialsstFileList) {
 +      totalFileCount++;
 +      if (fileMetaData.level() == 0) {
 +        level0FilesCount++;
 +      }
 +    }
 +    LOG.debug("Total files : {}", totalFileCount);
 +    LOG.debug("Total L0 files: {}", level0FilesCount);
 +
 +    Assert.assertEquals(totalFileCount, level0FilesCount);
 +
 +    store.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE);
 +
 +    int level0FilesCountAfterCompact = 0;
 +    int totalFileCountAfterCompact = 0;
 +    int nonlevel0FilesCountAfterCompact = 0;
 +    List<LiveFileMetaData> nonlevelOFiles = new ArrayList<>();
 +
 +    for (LiveFileMetaData fileMetaData : store.getDb().getSstFileList()) {
 +      totalFileCountAfterCompact++;
 +      if (fileMetaData.level() == 0) {
 +        level0FilesCountAfterCompact++;
 +      } else {
 +        nonlevel0FilesCountAfterCompact++;
 +        nonlevelOFiles.add(fileMetaData);
 +      }
 +    }
 +
 +    LOG.debug("Total files : {}", totalFileCountAfterCompact);
 +    LOG.debug("Total L0 files: {}", level0FilesCountAfterCompact);
 +    LOG.debug("Total non L0/compacted files: {}",
 +        nonlevel0FilesCountAfterCompact);
 +
 +    Assert.assertTrue(nonlevel0FilesCountAfterCompact > 0);
 +
 +    createKeys(keyManager, "vol1", "buck2", keyCount, 1);
 +
 +    store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE);
 +
 +    List<LiveFileMetaData> allFiles = store.getDb().getSstFileList();
 +
 +    writeClient.createSnapshot("vol1", "buck2", "snapshot1");
 +
 +    GenericTestUtils.waitFor(
 +        () -> sstFilteringService.getSnapshotFilteredCount().get() >= 1, 1000,
 +        10000);
 +
 +    Assert
 +        .assertEquals(1, 
sstFilteringService.getSnapshotFilteredCount().get());
 +
 +    SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
 +        .get(SnapshotInfo.getTableKey("vol1", "buck2", "snapshot1"));
 +
 +    String dbSnapshots = rocksDbDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
 +    String snapshotDirName =
 +        dbSnapshots + OM_KEY_PREFIX + OM_DB_NAME + snapshotInfo
 +            .getCheckpointDirName();
 +
 +    for (LiveFileMetaData file : allFiles) {
 +      File sstFile =
 +          new File(snapshotDirName + OM_KEY_PREFIX + file.fileName());
 +      if (nonlevelOFiles.stream()
 +          .anyMatch(o -> file.fileName().equals(o.fileName()))) {
 +        Assert.assertFalse(sstFile.exists());
 +      } else {
 +        Assert.assertTrue(sstFile.exists());
 +      }
 +    }
 +
 +    List<String> processedSnapshotIds = Files
 +        .readAllLines(Paths.get(dbSnapshots, OzoneConsts.FILTERED_SNAPSHOTS));
 +    Assert.assertTrue(
 +        processedSnapshotIds.contains(snapshotInfo.getSnapshotID()));
 +
 +  }
 +
 +  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
 +  private void createKeys(KeyManager keyManager, String volumeName,
 +      String bucketName, int keyCount, int numBlocks) throws IOException {
 +    for (int x = 0; x < keyCount; x++) {
 +      String keyName =
 +          String.format("key%s", RandomStringUtils.randomAlphanumeric(5));
 +      // Create Volume and Bucket
-       TestKeyDeletingService
-           .createVolumeAndBucket(keyManager, volumeName, bucketName, false);
++      createVolumeAndBucket(keyManager, volumeName, bucketName, false);
 +
 +      // Create the key
-       TestKeyDeletingService
-           .createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
++      createAndCommitKey(writeClient, keyManager, volumeName, bucketName,
 +              keyName, numBlocks);
 +    }
 +  }
 +
++  private static void createVolumeAndBucket(KeyManager keyManager,
++                                            String volumeName,
++                                            String bucketName,
++                                            boolean isVersioningEnabled)
++      throws IOException {
++    // cheat here, just create a volume and bucket entry so that we can
++    // create the keys, we put the same data for key and value since the
++    // system does not decode the object
++    OMRequestTestUtils.addVolumeToOM(keyManager.getMetadataManager(),
++        OmVolumeArgs.newBuilder()
++            .setOwnerName("o")
++            .setAdminName("a")
++            .setVolume(volumeName)
++            .build());
++
++    OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(),
++        OmBucketInfo.newBuilder().setVolumeName(volumeName)
++            .setBucketName(bucketName)
++            .setIsVersionEnabled(isVersioningEnabled)
++            .build());
++  }
++
++  private static OmKeyArgs createAndCommitKey(OzoneManagerProtocol 
writeClient,
++                                              KeyManager keyManager,
++                                              String volumeName,
++                                              String bucketName,
++                                              String keyName,
++                                              int numBlocks)
++      throws IOException {
++
++    OmKeyArgs keyArg =
++        new OmKeyArgs.Builder()
++            .setVolumeName(volumeName)
++            .setBucketName(bucketName)
++            .setKeyName(keyName)
++            .setAcls(Collections.emptyList())
++            .setReplicationConfig(StandaloneReplicationConfig.getInstance(
++                HddsProtos.ReplicationFactor.ONE))
++            .setLocationInfoList(new ArrayList<>())
++            .build();
++    //Open and Commit the Key in the Key Manager.
++    OpenKeySession session = writeClient.openKey(keyArg);
++    for (int i = 0; i < numBlocks; i++) {
++      keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, 
session.getId(),
++          new ExcludeList()));
++    }
++    writeClient.commitKey(keyArg, session.getId());
++    return keyArg;
++  }
++
 +}
diff --cc 
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 1b7e2d8264,c274b89a14..c0cc5239e3
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@@ -72,9 -72,11 +72,12 @@@ import static org.apache.hadoop.fs.ozon
  import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_MAX_LISTING_PAGE_SIZE;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
  import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
  import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
  
diff --cc 
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 4e8f28ea7b,b78936678f..269e9ded1e
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@@ -71,9 -72,11 +72,12 @@@ import static org.apache.hadoop.fs.ozon
  import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE_DEFAULT;
+ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_MAX_LISTING_PAGE_SIZE;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
  import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
  import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
  import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
  import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_EMPTY;
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 53c3cb9a22,e8da93b366..e876b7fec1
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@@ -572,23 -585,26 +586,46 @@@ public class ClientProtocolStub impleme
      return null;
    }
  
+   @Override
+   public OzoneDataStreamOutput createStreamKey(
+       String volumeName, String bucketName, String keyName, long size,
+       ReplicationConfig replicationConfig, Map<String, String> metadata)
+       throws IOException {
+     return null;
+   }
+ 
+   @Override
+   public OzoneDataStreamOutput createMultipartStreamKey(
+       String volumeName, String bucketName, String keyName, long size,
+       int partNumber, String uploadID) throws IOException {
+     return null;
+   }
+ 
+   @Override
+   public OzoneDataStreamOutput createStreamFile(
+       String volumeName, String bucketName, String keyName, long size,
+       ReplicationConfig replicationConf, boolean overWrite, boolean recursive)
+       throws IOException {
+     return null;
+   }
++
 +  @Override
 +  public String createSnapshot(String volumeName,
 +      String bucketName, String snapshotName)
 +      throws IOException {
 +    return "";
 +  }
 +
 +  @Override
 +  public List<OzoneSnapshot> listSnapshot(String volumeName, String 
bucketName)
 +      throws IOException {
 +    return null;
 +  }
 +
 +  public SnapshotDiffReport snapshotDiff(String volumeName, String bucketName,
 +                                         String fromSnapshot, String 
toSnapshot)
 +      throws IOException {
 +    return null;
 +  }
 +
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to