This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-4944
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 7719e23eec4c4122d0fe7d28ca3965ae198c54b8
Merge: 9280b67553 0b6f46714f
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Apr 19 12:08:26 2022 -0700

    Merge remote-tracking branch 'asf/master' into HDDS-4944
    
    Conflicts:
    hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
    hadoop-ozone/dist/src/main/smoketest/upgrade/generate.robot
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
    
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
    
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
    
    Also modified:
    
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
    
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
    
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
    
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java

 .github/workflows/post-commit.yml                  |  12 +-
 dev-support/annotations/pom.xml                    | 114 ++++
 .../RequestFeatureValidatorProcessor.java          | 289 ++++++++++
 .../org/apache/ozone/annotations/package-info.java |   5 +
 .../services/javax.annotation.processing.Processor |  19 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   2 +-
 hadoop-hdds/common/pom.xml                         |  13 +-
 ...DatanodeVersions.java => ComponentVersion.java} |  24 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |  71 +++
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   6 +
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   1 +
 .../hadoop/hdds/client/RatisReplicationConfig.java |  34 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |   4 +-
 .../hdds/client/StandaloneReplicationConfig.java   |  34 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  13 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  66 ++-
 .../RequestTypeDependentRetryPolicyCreator.java    |   6 +-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   2 +
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |   4 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |  21 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   9 +-
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  |   2 +-
 .../protocol/StorageContainerLocationProtocol.java |  21 +-
 .../ContainerCommandResponseBuilders.java          |  13 +-
 .../org/apache/hadoop/ozone/ClientVersion.java     |  75 +++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  34 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   4 +
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |  72 +++
 .../org/apache/hadoop/ozone/audit/AuditLogger.java |  55 +-
 .../apache/hadoop/ozone/audit/AuditLoggerType.java |   3 +-
 .../apache/hadoop/ozone/audit/AuditMessage.java    |  30 +-
 .../audit/{AuditLoggerType.java => S3GAction.java} |  41 +-
 .../org/apache/hadoop/ozone/lock/ActiveLock.java   |  35 +-
 .../org/apache/hadoop/ozone/lock/LockManager.java  |  49 +-
 .../common/src/main/resources/ozone-default.xml    |  91 ++-
 .../hdds/TestComponentVersionInvariants.java       |  98 ++++
 .../client/TestReplicationConfigValidator.java     |  16 +-
 .../hadoop/hdds/protocol/TestDatanodeDetails.java  |  11 +-
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |   4 +-
 .../hadoop/hdds/scm/pipeline/TestPipeline.java     |   9 +-
 .../hadoop/ozone/audit/TestOzoneAuditLogger.java   |  21 +
 .../ozone/container/ContainerTestHelper.java       |  96 +---
 .../TestDefaultUpgradeFinalizationExecutor.java    |   3 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   8 +-
 .../ozone/container/common/impl/ContainerData.java |  46 +-
 .../ozone/container/common/impl/ContainerSet.java  |  16 +-
 .../common/interfaces/ContainerInspector.java      |  72 +++
 .../ozone/container/common/interfaces/Handler.java |  12 +-
 .../common/report/ContainerReportPublisher.java    |   5 +-
 .../common/report/IncrementalReportSender.java     |  30 +
 .../common/statemachine/DatanodeStateMachine.java  |  37 +-
 .../common/statemachine/StateContext.java          | 157 ++++--
 .../commandhandler/CommandDispatcher.java          |  16 +
 .../commandhandler/CommandHandler.java             |  10 +
 .../CreatePipelineCommandHandler.java              |  13 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |  11 +
 .../DeleteContainerCommandHandler.java             |   5 +
 .../ReplicateContainerCommandHandler.java          |   5 +
 .../states/datanode/RunningDatanodeState.java      |  16 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  44 +-
 .../states/endpoint/RegisterEndpointTask.java      |   6 +-
 .../server/ratis/ContainerStateMachine.java        |   7 +-
 .../transport/server/ratis/XceiverServerRatis.java |  18 +-
 .../common/utils/ContainerInspectorUtil.java       |  87 +++
 .../container/common/volume/AbstractFuture.java    |  13 +-
 .../container/keyvalue/KeyValueContainer.java      |  76 ++-
 .../container/keyvalue/KeyValueContainerData.java  |  10 +-
 .../KeyValueContainerMetadataInspector.java        | 463 ++++++++++++++++
 .../ozone/container/keyvalue/KeyValueHandler.java  | 109 ++--
 .../container/keyvalue/TarContainerPacker.java     |  55 +-
 .../container/keyvalue/helpers/ChunkUtils.java     |   4 +
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  23 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  | 114 ++--
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  18 +-
 .../background/BlockDeletingService.java           | 108 ++--
 .../container/metadata/AbstractDatanodeStore.java  |   6 +-
 .../ozone/container/ozoneimpl/ContainerReader.java |   6 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  40 +-
 .../container/replication/MeasuredReplicator.java  |   3 +-
 .../replication/ReplicationSupervisor.java         |   2 +-
 .../commands/RefreshVolumeUsageCommand.java        |  57 ++
 .../hadoop/ozone/container/common/ScmTestMock.java |   7 +-
 .../container/common/TestBlockDeletingService.java |   6 +-
 .../common/TestKeyValueContainerData.java          |   6 +-
 .../TestSchemaOneBackwardsCompatibility.java       |   2 +-
 .../common/helpers/TestDatanodeVersionFile.java    |   4 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |   2 +-
 .../common/impl/TestContainerPersistence.java      |  47 --
 .../container/common/impl/TestHddsDispatcher.java  |  27 +-
 .../common/statemachine/TestStateContext.java      | 107 +++-
 .../TestCreatePipelineCommandHandler.java          |  36 +-
 .../states/endpoint/TestHeartbeatEndpointTask.java |  26 +-
 .../common/volume/TestStorageVolumeChecker.java    |  12 +-
 .../keyvalue/TestKeyValueBlockIterator.java        |  10 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  67 ++-
 .../keyvalue/TestKeyValueContainerCheck.java       | 158 +-----
 ...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++----
 .../TestKeyValueContainerMetadataInspector.java    | 360 ++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    |  26 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 -
 .../keyvalue/impl/CommonChunkManagerTestCases.java |   1 -
 .../keyvalue/impl/TestBlockManagerImpl.java        |  57 +-
 .../container/ozoneimpl/TestContainerReader.java   |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   6 +-
 .../replication/TestMeasuredReplicator.java        |  15 +
 .../testutils/BlockDeletingServiceTestImpl.java    |   2 +-
 hadoop-hdds/dev-support/checkstyle/checkstyle.xml  |   2 +
 hadoop-hdds/docs/content/concept/Containers.md     |   3 +-
 hadoop-hdds/docs/content/concept/Datanodes.md      |   4 +-
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   6 +-
 hadoop-hdds/docs/content/concept/Recon.md          |   9 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |  41 +-
 hadoop-hdds/docs/content/feature/PrefixFSO.md      |  68 +--
 hadoop-hdds/docs/content/feature/SCM-HA.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.zh.md      |   4 +-
 hadoop-hdds/docs/content/interface/Ofs.md          |   2 +-
 hadoop-hdds/docs/content/interface/S3.md           |   7 -
 hadoop-hdds/docs/content/interface/S3.zh.md        |   7 -
 .../docs/content/start/StartFromDockerHub.md       |   6 -
 .../docs/content/start/StartFromDockerHub.zh.md    |   5 -
 hadoop-hdds/docs/content/tools/TestTools.md        |   4 +-
 hadoop-hdds/docs/content/tools/TestTools.zh.md     |   4 +-
 hadoop-hdds/docs/dev-support/bin/generate-site.sh  |  12 +-
 .../docs/dev-support/bin/make_images_responsive.py |  57 ++
 .../themes/ozonedoc/layouts/shortcodes/image.html  |   2 +-
 hadoop-hdds/framework/pom.xml                      |   8 +
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  12 +
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ++
 ...lockLocationProtocolClientSideTranslatorPB.java |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |  24 +-
 .../scm/update/client/UpdateServiceConfig.java     |   5 +-
 .../authority/PKIProfiles/DefaultProfile.java      |  32 +-
 ...ateClient.java => CommonCertificateClient.java} | 116 ++--
 .../certificate/client/OMCertificateClient.java    |  79 +--
 .../certificate/client/ReconCertificateClient.java |  40 +-
 ...va => FixedThreadPoolWithAffinityExecutor.java} |  70 ++-
 .../hadoop/hdds/server/http/HttpServer2.java       |   1 +
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  14 +
 .../hadoop/hdds/utils/db/DBUpdatesWrapper.java     |   9 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   1 +
 .../hadoop/hdds/utils/db/cache/EpochEntry.java     |  75 ---
 .../hadoop/hdds/utils/db/cache/FullTableCache.java |  50 +-
 .../hdds/utils/db/cache/PartialTableCache.java     |  56 +-
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   4 +-
 .../client/TestDefaultCertificateClient.java       |   2 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  |  35 +-
 .../hadoop/hdds/server/http/TestHtmlQuoting.java   |   5 +-
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |  77 ++-
 .../src/main/proto/ScmAdminProtocol.proto          |   1 +
 hadoop-hdds/interface-client/pom.xml               |   5 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   4 +-
 .../interface-client/src/main/proto/hdds.proto     |   7 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  17 +-
 .../src/main/proto/ScmServerSecurityProtocol.proto |   8 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |   2 +-
 .../container/AbstractContainerReportHandler.java  |  44 +-
 .../hdds/scm/container/ContainerManager.java       |   7 +
 .../hdds/scm/container/ContainerManagerImpl.java   |  78 +--
 .../hdds/scm/container/ContainerReportHandler.java | 141 +++--
 .../hdds/scm/container/ContainerStateManager.java  |  10 +-
 .../scm/container/ContainerStateManagerImpl.java   |  24 +-
 .../IncrementalContainerReportHandler.java         |  18 +-
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../scm/container/balancer/ContainerBalancer.java  | 526 ++++++++++++------
 .../balancer/ContainerBalancerConfiguration.java   |  23 +-
 .../balancer/ContainerBalancerMetrics.java         | 163 ++++--
 .../IllegalContainerBalancerStateException.java    |  36 +-
 ...lidContainerBalancerConfigurationException.java |  37 +-
 .../scm/container/states/ContainerStateMap.java    |  27 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java |  37 +-
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     |   4 +-
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   8 +-
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |   2 +-
 ...ffer.java => SCMHADBTransactionBufferStub.java} |   8 +-
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |   2 +-
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   3 +-
 ...MockSCMHAManager.java => SCMHAManagerStub.java} |  42 +-
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  |   2 +
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |  14 +-
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |   3 +-
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  15 +-
 .../hdds/scm/metadata/MoveDataNodePairCodec.java   |   6 +-
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |   6 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   9 +-
 .../hadoop/hdds/scm/node/DatanodeUsageInfo.java    |  15 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  18 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  18 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  34 +-
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   2 +-
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   4 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   6 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   4 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  54 +-
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  11 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   6 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   3 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  28 +
 ...inerLocationProtocolServerSideTranslatorPB.java |  26 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   2 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   2 +-
 .../scm/server/OzoneStorageContainerManager.java   |   2 +
 .../hdds/scm/server/SCMClientProtocolServer.java   |  54 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  20 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  12 +-
 .../hadoop/hdds/scm/server/SCMPolicyProvider.java  |   5 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  16 +-
 .../hadoop/hdds/scm/server/SCMStorageConfig.java   |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |  62 ++-
 .../main/resources/webapps/scm/scm-overview.html   |   1 +
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  14 +-
 .../hadoop/hdds/scm/TestHddsServerUtils.java       |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   7 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   8 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  21 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |  20 +-
 .../container/TestCloseContainerEventHandler.java  |   8 +-
 .../scm/container/TestContainerManagerImpl.java    |  26 +-
 .../scm/container/TestContainerReportHandler.java  |  34 +-
 .../scm/container/TestContainerStateManager.java   |  14 +-
 .../TestIncrementalContainerReportHandler.java     |  30 +-
 .../hdds/scm/container/TestReplicationManager.java | 140 ++---
 .../scm/container/TestUnknownContainerReport.java  |   4 +-
 .../container/balancer/TestContainerBalancer.java  | 233 +++++---
 .../states/TestContainerReplicaCount.java          |   5 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |  15 +-
 .../hdds/scm/ha/TestSequenceIDGenerator.java       |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   6 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   7 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +-
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  24 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  12 +-
 .../hdds/scm/node/TestSCMNodeStorageStatMap.java   |   5 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   4 +-
 .../TestPipelineDatanodesIntersection.java         |  13 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 119 ++--
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 158 +++++-
 .../scm/pipeline/TestPipelineStateManagerImpl.java |  62 ++-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  50 +-
 .../scm/pipeline/TestSimplePipelineProvider.java   |  18 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  20 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |  10 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  22 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |  10 +-
 .../server/TestSCMDatanodeHeartbeatDispatcher.java |  54 +-
 .../server/TestStorageContainerManagerStarter.java |   6 +-
 .../TestSCMHAUnfinalizedStateValidationAction.java |   8 +
 .../testutils/ReplicationNodeManagerMock.java      |  14 +
 hadoop-hdds/test-utils/pom.xml                     |   5 +
 .../org/apache/ozone/test/GenericTestUtils.java    |  18 +-
 .../java/org/apache/ozone/test/tag/Flaky.java}     |  36 +-
 .../main/java/org/apache/ozone/test/tag/Slow.java} |  35 +-
 .../org/apache/ozone/test/tag/package-info.java}   |  23 +-
 .../scm/cli/ContainerBalancerStartSubcommand.java  |  18 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |  17 +-
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |   4 +-
 .../hdds/scm/cli/container/InfoSubcommand.java     |   7 +-
 .../hdds/scm/cli/datanode/ListInfoSubcommand.java  |  38 +-
 .../hdds/scm/cli/container/TestInfoSubCommand.java |   6 +-
 .../datanode/TestContainerBalancerSubCommand.java  |  22 +-
 .../apache/hadoop/ozone/client/ObjectStore.java    |   2 +-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |   1 +
 .../hadoop/ozone/client/OzoneClientFactory.java    |   5 +-
 .../client/checksum/BaseFileChecksumHelper.java    |  54 +-
 .../checksum/ReplicatedFileChecksumHelper.java     |  44 +-
 .../client/io/BlockOutputStreamEntryPool.java      |   1 -
 .../client/io/MultipartCryptoKeyInputStream.java   |   4 +
 .../ozone/client/protocol/ClientProtocol.java      |   5 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  61 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |  39 +-
 .../hadoop/ozone/client/rpc/RpcClientTest.java     | 279 +++++-----
 .../hadoop/ozone/client/rpc/TestOzoneKMSUtil.java  |   3 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  21 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 180 +++---
 .../apache/hadoop/ozone/om/helpers/DBUpdates.java  |  10 +
 .../hadoop/ozone/om/helpers/OmDirectoryInfo.java   |   7 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  16 +-
 .../ozone/om/helpers/OmKeyLocationInfoGroup.java   |   3 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |   4 +-
 .../hadoop/ozone/om/helpers/ServiceInfo.java       |  25 +-
 .../apache/hadoop/ozone/om/lock/LockUsageInfo.java |  63 +++
 .../apache/hadoop/ozone/om/lock/OMLockMetrics.java | 207 +++++++
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     | 273 ++++++++-
 .../RangerRestMultiTenantAccessController.java     | 357 ++++++------
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |   3 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  32 +-
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java |  37 +-
 .../apache/hadoop/ozone/security/acl/OzoneObj.java |   3 +-
 .../apache/hadoop/ozone/web/utils/OzoneUtils.java  |  18 +-
 .../org/apache/hadoop/ozone/TestOzoneAcls.java     |  10 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   8 +-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |  12 +-
 .../ozone/om/helpers/TestOmMultipartKeyInfo.java   |   4 +-
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java | 227 ++++++++
 .../ozone/security/acl/TestOzoneObjInfo.java       |   5 +-
 hadoop-ozone/csi/pom.xml                           |   4 +
 hadoop-ozone/datanode/pom.xml                      |   1 +
 hadoop-ozone/dev-support/checks/_lib.sh            |   2 +-
 .../dev-support/checks/_mvn_unit_report.sh         |  24 +-
 hadoop-ozone/dev-support/checks/acceptance.sh      |   3 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   2 +-
 hadoop-ozone/dev-support/checks/rat.sh             |   4 +-
 .../dist/dev-support/bin/dist-layout-stitching     |   1 +
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../dist/src/main/compose/ozone-topology/test.sh   |   6 -
 .../dist/src/main/compose/ozone/docker-config      |   4 +
 .../dist/src/main/compose/ozone/prometheus.yml     |   1 +
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   6 +-
 .../src/main/compose/ozonesecure-ha/docker-config  |   2 +-
 .../src/main/compose/ozonesecure/docker-config     |   4 +
 .../dist/src/main/compose/ozonesecure/test.sh      |   2 +-
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  86 ++-
 .../dist/src/main/compose/upgrade/testlib.sh       |   9 +-
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   6 +
 .../dist/src/main/compose/xcompat/clients.yaml     |  18 +
 .../dist/src/main/compose/xcompat/docker-config    |   3 +-
 hadoop-ozone/dist/src/main/compose/xcompat/test.sh |  17 +-
 hadoop-ozone/dist/src/main/k8s/README.md           |  68 +++
 .../k8s/definitions/ozone-csi/csi-controller.yaml  |   2 +-
 .../main/k8s/examples/getting-started/Flekszible   |   2 +
 .../examples/getting-started/config-configmap.yaml |   1 +
 .../examples/getting-started/kustomization.yaml}   |  17 +-
 .../dist/src/main/k8s/examples/minikube/Flekszible |   2 +
 .../k8s/examples/minikube/config-configmap.yaml    |   1 +
 .../main/k8s/examples/minikube/kustomization.yaml} |  17 +-
 .../src/main/k8s/examples/ozone-dev/Flekszible     |   1 +
 .../k8s/examples/ozone-dev/config-configmap.yaml   |   1 +
 .../k8s/examples/ozone-dev/kustomization.yaml}     |  26 +-
 .../dist/src/main/k8s/examples/ozone-ha/Flekszible |   3 +
 .../main/k8s/examples/ozone-ha/kustomization.yaml} |  13 +-
 .../dist/src/main/k8s/examples/ozone/Flekszible    |   3 +-
 .../main/k8s/examples/ozone/config-configmap.yaml  |   1 +
 .../main/k8s/examples/ozone/kustomization.yaml}    |  13 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   1 +
 .../src/main/smoketest/admincli/datanode.robot     |   6 +-
 .../main/smoketest/auditparser/auditparser.robot   |  22 +-
 .../dist/src/main/smoketest/basic/basic.robot      |   9 +-
 .../debug/ozone-debug-corrupt-block.robot          |  49 ++
 .../ozone-debug-dead-datanode.robot}               |  35 +-
 .../debug/ozone-debug-stale-datanode.robot         |  48 ++
 .../main/smoketest/debug/ozone-debug-tests.robot   |  51 ++
 .../src/main/smoketest/debug/ozone-debug.robot     |  93 +++-
 .../dist/src/main/smoketest/freon/generate.robot   |  19 +-
 .../dist/src/main/smoketest/freon/remove.robot     |  21 +-
 .../dist/src/main/smoketest/freon/validate.robot   |  13 +-
 .../dist/src/main/smoketest/omha/om-prepare.robot  |   3 +-
 .../dist/src/main/smoketest/ozone-lib/freon.robot  |  65 +++
 .../dist/src/main/smoketest/recon/recon-api.robot  |   6 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  21 +-
 .../dist/src/main/smoketest/s3/bucketlist.robot    |  15 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |  36 ++
 .../dist/src/main/smoketest/spnego/web.robot       |   4 -
 .../dist/src/main/smoketest/upgrade/generate.robot |  10 +-
 .../src/shell/conf/s3g-audit-log4j2.properties     |  90 +++
 hadoop-ozone/dist/src/shell/ozone/ozone            |   1 +
 .../fault-injection-test/mini-chaos-tests/pom.xml  |  25 +
 .../hadoop/ozone/TestAllMiniChaosOzoneCluster.java |   2 +-
 .../hadoop/ozone/insight/TestBaseInsightPoint.java |   7 +-
 hadoop-ozone/integration-test/pom.xml              |   5 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  26 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   2 +-
 .../fs/ozone/TestOzoneFileSystemMissingParent.java |   3 +
 .../fs/ozone/TestOzoneFileSystemWithLinks.java     |  14 +-
 .../hadoop/fs/ozone/TestRootedDDSWithFSO.java      | 245 ++++++++
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   8 +-
 .../fs/ozone/TestRootedOzoneFileSystemWithFSO.java |  48 ++
 .../rooted/ITestRootedOzoneContractRootDir.java    |   1 -
 .../hadoop/hdds/scm/TestRatisPipelineLeader.java   |  23 +-
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   7 +-
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |  19 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   5 +-
 .../TestContainerStateManagerIntegration.java      | 127 ++---
 .../metrics/TestSCMContainerManagerMetrics.java    |   4 +-
 .../hdds/scm/pipeline/TestLeaderChoosePolicy.java  |  28 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   2 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  29 +-
 .../TestRatisPipelineCreateAndDestroy.java         |  31 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   6 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   7 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       |  33 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   8 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  20 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   2 +-
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   5 +-
 ...OutputUtil.java => StandardOutputTestBase.java} |   2 +-
 .../hadoop/ozone/TestContainerOperations.java      |  35 +-
 .../apache/hadoop/ozone/TestDelegationToken.java   |   8 +
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  36 +-
 .../hadoop/ozone/TestMiniOzoneOMHACluster.java     |   8 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   6 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  13 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |   5 +-
 .../ozone/client/TestOzoneClientFactory.java       |  75 +++
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   7 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  23 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  23 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  14 +-
 .../client/rpc/TestContainerStateMachine.java      |   4 +-
 .../TestContainerStateMachineFailureOnRead.java    |  21 +-
 .../rpc/TestContainerStateMachineFailures.java     |  13 +-
 .../rpc/TestContainerStateMachineFlushDelay.java   |  14 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  10 +-
 .../client/rpc/TestFailureHandlingByClient.java    | 144 ++++-
 .../rpc/TestFailureHandlingByClientFlushDelay.java |   8 +-
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  10 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      |  28 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java       |  18 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  33 +-
 .../rpc/TestOzoneRpcClientForAclAuditLog.java      |   3 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  10 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  24 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  12 +-
 .../ozone/client/rpc/read/TestKeyInputStream.java  |   2 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   8 +-
 .../commandhandler/TestBlockDeletion.java          |  21 +-
 .../TestCloseContainerByPipeline.java              |  24 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +-
 .../commandhandler/TestDeleteContainerHandler.java |   2 +-
 ...ler.java => TestRefreshVolumeUsageHandler.java} | 121 ++--
 .../transport/server/ratis/TestCSMMetrics.java     |   3 +-
 .../container/ozoneimpl/TestOzoneContainer.java    | 152 ++---
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   6 +-
 .../container/server/TestContainerServer.java      |  23 +-
 .../server/TestSecureContainerServer.java          |  10 -
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  13 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |  16 +-
 .../hadoop/ozone/om/TestAddRemoveOzoneManager.java |  19 +-
 .../apache/hadoop/ozone/om/TestBucketOwner.java    |  27 +-
 .../ozone/om/TestContainerReportWithKeys.java      |   7 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  39 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   2 -
 .../apache/hadoop/ozone/om/TestObjectStore.java    |  12 +-
 .../hadoop/ozone/om/TestObjectStoreWithFSO.java    |  81 +++
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  77 ++-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |  11 +-
 .../org/apache/hadoop/ozone/om/TestOmInit.java     |   7 +-
 .../org/apache/hadoop/ozone/om/TestOmLDBCli.java   |   5 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  25 +-
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |   2 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |  40 +-
 .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java |   2 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |  62 +--
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   4 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |   2 -
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |  14 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   |  49 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   9 +-
 .../om/multitenant/TestMultiTenantVolume.java      |   2 +-
 .../om/ratis/TestOzoneManagerRatisRequest.java     |  35 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |   8 +-
 .../hadoop/ozone/recon/TestReconScmSnapshot.java   |   2 +-
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |   2 +-
 .../ozone/recon/TestReconWithOzoneManager.java     |   7 +-
 .../ozone/recon/TestReconWithOzoneManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |   5 +-
 .../TestSCMContainerPlacementPolicyMetrics.java    |  18 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |   4 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   4 +-
 .../ozone/scm/TestStorageContainerManagerHA.java   |   4 +-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |   2 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |  27 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   2 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |   4 +-
 hadoop-ozone/interface-client/pom.xml              |   4 +
 .../src/main/proto/OmClientProtocol.proto          |   3 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   6 +-
 .../ozone/om/codec/RepeatedOmKeyInfoCodec.java     |   6 +-
 .../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java  |   4 +-
 .../om/codec/TestOmMultipartKeyInfoCodec.java      |   3 +-
 .../ozone/om/codec/TestRepeatedOmKeyInfoCodec.java |   3 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +
 .../org/apache/hadoop/ozone/om/BucketManager.java  |  22 -
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 255 ---------
 .../hadoop/ozone/om/DirectoryDeletingService.java  |   7 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   9 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 169 +++---
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  31 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 173 ++++--
 .../hadoop/ozone/om/OpenKeyCleanupService.java     |   3 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  38 +-
 .../apache/hadoop/ozone/om/OzoneManagerUtils.java  |  94 +++-
 .../hadoop/ozone/om/OzonePrefixPathImpl.java       |  20 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   1 +
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  10 +-
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |  41 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  17 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  18 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |  77 ++-
 .../BucketLayoutAwareOMKeyRequestFactory.java      | 314 +++++++++++
 .../hadoop/ozone/om/request/OMClientRequest.java   |  15 +-
 .../ozone/om/request/OMClientRequestUtils.java     |  50 ++
 .../ozone/om/request/OMKeyRequestFactory.java      | 139 -----
 .../om/request/bucket/OMBucketCreateRequest.java   |  18 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |   9 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   4 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |  19 -
 .../file/OMDirectoryCreateRequestWithFSO.java      |   5 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |  17 -
 .../ozone/om/request/file/OMFileRequest.java       |  60 +-
 .../om/request/key/OMAllocateBlockRequest.java     |  14 -
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  67 +--
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  24 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  18 -
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |  15 -
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |   6 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |  19 -
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   2 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  45 +-
 .../ozone/om/request/key/OMKeysDeleteRequest.java  | 137 +++--
 .../ozone/om/request/key/OMKeysRenameRequest.java  |   5 +-
 .../om/request/key/OMOpenKeysDeleteRequest.java    |  48 +-
 .../om/request/key/OmKeysDeleteRequestWithFSO.java | 141 +++++
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   2 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   2 +-
 .../S3InitiateMultipartUploadRequest.java          |  15 -
 .../multipart/S3MultipartUploadAbortRequest.java   |  15 -
 .../S3MultipartUploadCommitPartRequest.java        |  15 -
 .../S3MultipartUploadCompleteRequest.java          |  87 ++-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  15 +-
 .../om/request/s3/security/OMSetSecretRequest.java |   5 +-
 .../security/OMCancelDelegationTokenRequest.java   |  42 +-
 .../security/OMGetDelegationTokenRequest.java      |  54 +-
 .../security/OMRenewDelegationTokenRequest.java    |  51 +-
 .../validation/RequestFeatureValidator.java        |  99 ++++
 .../request/validation/RequestProcessingPhase.java |  29 +-
 .../om/request/validation/RequestValidations.java  | 107 ++++
 .../om/request/validation/ValidationCondition.java |  55 ++
 .../om/request/validation/ValidationContext.java   |  52 ++
 .../om/request/validation/ValidatorRegistry.java   | 201 +++++++
 .../ozone/om/request/validation/package-info.java  |  62 +++
 .../hadoop/ozone/om/response/OMClientResponse.java |  16 -
 .../om/response/bucket/OMBucketCreateResponse.java |   3 +-
 .../om/response/bucket/OMBucketDeleteResponse.java |   3 +-
 .../response/file/OMFileCreateResponseWithFSO.java |   4 +-
 .../om/response/key/OMAllocateBlockResponse.java   |   3 +-
 .../key/OMAllocateBlockResponseWithFSO.java        |   3 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   4 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   3 +-
 .../response/key/OMKeyCreateResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   3 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |   3 +-
 .../response/key/OMKeyRenameResponseWithFSO.java   |   3 +-
 .../om/response/key/OMKeysDeleteResponse.java      |  12 +-
 ...thFSO.java => OMKeysDeleteResponseWithFSO.java} |  76 ++-
 .../om/response/key/OMKeysRenameResponse.java      |  10 +
 .../om/response/key/OMOpenKeysDeleteResponse.java  |  18 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |   9 +-
 .../S3MultipartUploadCommitPartResponse.java       |   9 +-
 .../S3MultipartUploadCompleteResponse.java         |  36 +-
 .../S3MultipartUploadCompleteResponseWithFSO.java  |   9 +-
 .../om/snapshot/OzoneManagerSnapshotProvider.java  |   9 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  25 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   2 +
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |  23 +-
 .../ozone/om/ScmBlockLocationTestingClient.java    |  14 +-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     | 243 ++++----
 .../hadoop/ozone/om/TestKeyDeletingService.java    | 142 +++--
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  23 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |  50 +-
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |   4 +-
 .../apache/hadoop/ozone/om/TestTrashService.java   |   6 +-
 ...tOzoneManagerDoubleBufferWithDummyResponse.java |   1 +
 ...TestOzoneManagerDoubleBufferWithOMResponse.java |   7 +-
 .../om/ratis/TestOzoneManagerRatisServer.java      |   8 +
 .../om/ratis/TestOzoneManagerStateMachine.java     |   1 +
 .../ozone/om/request/OMRequestTestUtils.java       |  16 +
 .../request/TestBucketLayoutAwareOMKeyFactory.java | 166 ++++++
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |   4 +
 .../bucket/TestOMBucketDeleteRequestWithFSO.java   |  76 +++
 .../request/file/TestOMDirectoryCreateRequest.java |  37 +-
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |  16 +-
 .../om/request/file/TestOMFileCreateRequest.java   |   6 +-
 .../file/TestOMFileCreateRequestWithFSO.java       |   2 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |   7 +-
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |  97 +++-
 .../om/request/key/TestOMKeyRenameRequest.java     |  14 +-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   2 +-
 .../om/request/key/TestOMKeysDeleteRequest.java    |  35 +-
 .../key/TestOMKeysDeleteRequestWithFSO.java        | 109 ++++
 .../om/request/key/TestOMKeysRenameRequest.java    |   4 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   | 178 +++---
 .../TestS3InitiateMultipartUploadRequest.java      |   2 +-
 ...estS3InitiateMultipartUploadRequestWithFSO.java |   2 +-
 .../TestS3MultipartUploadAbortRequest.java         |   4 +-
 .../TestS3MultipartUploadAbortRequestWithFSO.java  |   5 +
 .../TestS3MultipartUploadCommitPartRequest.java    |   6 +-
 .../TestS3MultipartUploadCompleteRequest.java      |  40 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |   7 +-
 .../security/TestOMGetDelegationTokenRequest.java  |  10 +-
 .../upgrade/TestOMCancelPrepareRequest.java        |   2 +-
 .../TestRequestFeatureValidatorProcessor.java      | 524 +++++++++++++++++
 .../request/validation/TestRequestValidations.java | 349 ++++++++++++
 .../request/validation/TestValidatorRegistry.java  | 215 +++++++
 .../GeneralValidatorsForTesting.java               | 190 +++++++
 .../ValidatorsForOnlyOldClientValidations.java     |  43 ++
 .../ozone/om/response/TestCleanupTableInfo.java    |   6 +-
 .../om/response/key/TestOMKeyDeleteResponse.java   |   3 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  61 +-
 .../s3/multipart/TestS3MultipartResponse.java      |  19 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  17 +-
 .../ozone/security/TestAWSV4AuthValidator.java     |   2 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |   8 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |   2 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |   2 +-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       | 133 ++++-
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   2 +-
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |   1 +
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |   3 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |   1 +
 .../org.apache.hadoop.security.token.DtFetcher}    |   2 +-
 ...g.apache.hadoop.security.token.TokenIdentifier} |   6 +-
 .../org.apache.hadoop.security.token.TokenRenewer  |   1 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |  11 +
 .../org.apache.hadoop.security.token.TokenRenewer  |   1 +
 hadoop-ozone/pom.xml                               |   2 +-
 hadoop-ozone/recon-codegen/pom.xml                 |   4 +
 hadoop-ozone/recon/pom.xml                         |   2 +-
 .../hadoop/ozone/recon/ReconControllerModule.java  |   2 +
 .../org/apache/hadoop/ozone/recon/ReconServer.java | 125 +++++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  29 +
 .../ozone/recon/api/ClusterStateEndpoint.java      |  15 +-
 .../recon/metrics/OzoneManagerSyncMetrics.java     |  12 +
 .../recon/metrics/ReconTaskStatusMetrics.java      |  83 +++
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |  20 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   4 +-
 .../hadoop/ozone/recon/scm/ReconStorageConfig.java |  61 +-
 .../scm/ReconStorageContainerManagerFacade.java    |  18 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  14 +-
 .../impl/StorageContainerServiceProviderImpl.java  |  20 +-
 .../hadoop/ozone/recon/tasks/OMDBUpdateEvent.java  |   2 +-
 .../ozone/recon/tasks/OMDBUpdatesHandler.java      |  37 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  12 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |   2 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   7 +-
 .../ozone/recon/api/TestOpenContainerCount.java    |  16 +-
 .../recon/fsck/TestContainerHealthStatus.java      |   4 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   6 +-
 .../TestContainerHealthTaskRecordGenerator.java    |   3 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   4 +-
 .../scm/AbstractReconContainerManagerTest.java     |  20 +-
 .../ozone/recon/scm/TestReconContainerManager.java |   2 +-
 .../ozone/recon/scm/TestReconNodeManager.java      |  12 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  |  15 +-
 .../impl/TestReconNamespaceSummaryManagerImpl.java |   6 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |   4 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   4 +-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  | 256 ++++++---
 hadoop-ozone/s3gateway/pom.xml                     |   8 +
 .../org/apache/hadoop/ozone/s3/ClientIpFilter.java |  69 +++
 .../java/org/apache/hadoop/ozone/s3/Gateway.java   |   9 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  24 +-
 .../hadoop/ozone/s3/S3GatewayHttpServer.java       |  19 +
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   | 232 +++++---
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  69 ++-
 .../ozone/s3/endpoint/ListBucketResponse.java      |   6 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 161 +++++-
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |  46 +-
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   9 +
 .../hadoop/ozone/s3/metrics/S3GatewayMetrics.java  | 368 ++++++++++++
 .../hadoop/ozone/s3/metrics/package-info.java      |  23 +-
 .../org/apache/hadoop/ozone/s3/util/S3Utils.java   |  20 +
 .../s3gateway/src/main/resources/browser.html      | 617 ---------------------
 .../hadoop/ozone/client/OzoneBucketStub.java       |   3 +-
 .../hadoop/ozone/s3/TestS3GatewayAuditLog.java     | 158 ++++++
 .../hadoop/ozone/s3/endpoint/TestBucketAcl.java    |   2 +-
 .../hadoop/ozone/s3/endpoint/TestBucketList.java   |  36 +-
 .../ozone/s3/endpoint/TestPermissionCheck.java     |   4 +-
 .../ozone/s3/metrics/TestS3GatewayMetrics.java     | 284 ++++++++++
 .../src/test/resources/auditlog.properties         |  76 +++
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |   3 +-
 .../apache/hadoop/ozone/debug/ReadReplicas.java    |   5 +-
 .../ozone/freon/LeaderAppendLogEntryGenerator.java |   2 +-
 .../apache/hadoop/ozone/freon/OmKeyGenerator.java  |   2 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java |   2 +-
 .../freon/containergenerator/GeneratorOm.java      |   5 +-
 .../freon/containergenerator/GeneratorScm.java     |   4 +-
 .../apache/hadoop/ozone/fsck/ContainerMapper.java  |   5 +-
 .../ozone/shell/bucket/InfoBucketHandler.java      |   6 +
 .../apache/hadoop/ozone/freon/TestProgressBar.java |   6 +-
 pom.xml                                            |  92 ++-
 688 files changed, 16956 insertions(+), 6632 deletions(-)

diff --cc 
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index dca3efa82b,043028347c..19d508945b
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@@ -466,38 -450,7 +466,42 @@@ public final class OzoneConsts 
    public static final String OZONE_HTTP_FILTER_INITIALIZERS_SECURE =
        "org.apache.hadoop.security.AuthenticationFilterInitializer";
  
+   public static final String DELEGATION_TOKEN_KIND = "kind";
+   public static final String DELEGATION_TOKEN_SERVICE = "service";
+   public static final String DELEGATION_TOKEN_RENEWER = "renewer";
++
 +  public static final String OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT =
 +      "/service/xusers/secure/users";
 +
 +  // Ideally we should use /addUsersAndGroups endpoint for add user to role,
 +  // but it always return 405 somehow.
 +  // https://ranger.apache.org/apidocs/resource_RoleREST.html
 +  // #resource_RoleREST_addUsersAndGroups_PUT
 +  public static final String 
OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT =
 +      "/service/roles/roles/";
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT =
 +      "/service/xusers/users/?name=";
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT =
 +      "/service/xusers/secure/users/id/";
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT =
 +      "/service/roles/roles";
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT =
 +      "/service/roles/roles/name/";
 +
 +  // TODO: Use delete role endpoint
 +  public static final String OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT 
=
 +      "/service/xusers/secure/groups/id/";
 +
 +  public static final String 
OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT =
 +      "/service/public/v2/api/policy";
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT =
 +      "/service/public/v2/api/policy/?policyName=";
 +
 +  public static final String 
OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT =
 +      "/service/plugins/policies/";
  }
diff --cc 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
index 59e07bc5c9,2412b889fa..a38c80e153
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
@@@ -160,29 -148,8 +160,29 @@@ public class ObjectStore 
     * @throws IOException
     */
    public OzoneVolume getVolume(String volumeName) throws IOException {
 -    OzoneVolume volume = proxy.getVolumeDetails(volumeName);
 -    return volume;
 +    return proxy.getVolumeDetails(volumeName);
 +  }
 +
 +  public OzoneVolume getS3Volume() throws IOException {
 +    final S3VolumeContext resp = proxy.getS3VolumeContext();
 +
 +    S3Auth s3Auth = proxy.getThreadLocalS3Auth();
 +    // Update user principal if needed to be used for KMS client
 +    if (s3Auth != null) {
 +      // Update userPrincipal field with the value returned from OM. So that
 +      //  in multi-tenancy, KMS client can use the correct identity
 +      //  (instead of using accessId) to communicate with KMS.
 +      LOG.debug("Updating S3Auth.userPrincipal to {}", 
resp.getUserPrincipal());
 +      s3Auth.setUserPrincipal(resp.getUserPrincipal());
-       proxy.setTheadLocalS3Auth(s3Auth);
++      proxy.setThreadLocalS3Auth(s3Auth);
 +    }
 +
 +    OmVolumeArgs volume = resp.getOmVolumeArgs();
 +    return proxy.buildOzoneVolume(volume);
 +  }
 +
 +  public S3VolumeContext getS3VolumeContext() throws IOException {
 +    return proxy.getS3VolumeContext();
    }
  
    public S3SecretValue getS3Secret(String kerberosID) throws IOException {
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 41f598d842,c38dbc753f..13afb39743
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@@ -292,30 -307,9 +307,34 @@@ public final class OMConfigKeys 
    public static final long 
OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
        = 1000;
  
+   public static final String OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT =
+       "ozone.om.unflushed.transaction.max.count";
+   public static final int OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT
+       = 10000;
+ 
 +  /**
 +   * Temporary configuration properties for Ranger REST use in multitenancy.
 +   */
 +  public static final String OZONE_RANGER_OM_IGNORE_SERVER_CERT =
 +      "ozone.om.ranger.ignore.cert";
 +  public static final boolean OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT =
 +      true;
 +  public static final String OZONE_RANGER_OM_CONNECTION_TIMEOUT =
 +      "ozone.om.ranger.connection.timeout";
 +  public static final String OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT = 
"5s";
 +  public static final String OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT =
 +      "ozone.om.ranger.connection.request.timeout";
 +  public static final String
 +      OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
 +  public static final String OZONE_OM_RANGER_HTTPS_ADMIN_API_USER =
 +      "ozone.om.ranger.https.admin.api.user";
 +  // TODO: Note this should be removed once Ranger Java Client is in place.
 +  //  And Ranger SPNEGO auth (ranger.spnego.kerberos.principal ?) should be 
used
 +  //  instead. Or keep this solely for dev testing. See HDDS-5836.
 +  public static final String OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD =
 +      "ozone.om.ranger.https.admin.api.passwd";
 +  public static final String OZONE_RANGER_HTTPS_ADDRESS_KEY =
 +      "ozone.om.ranger.https-address";
 +  public static final String OZONE_RANGER_SERVICE =
 +      "ozone.om.ranger.service";
- 
  }
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
index 8931c625f6,0000000000..7184ea301d
mode 100644,000000..100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
@@@ -1,631 -1,0 +1,636 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.ozone.om.multitenant;
 +
 +import com.google.gson.Gson;
 +import com.google.gson.GsonBuilder;
 +import com.google.gson.JsonArray;
 +import com.google.gson.JsonDeserializationContext;
 +import com.google.gson.JsonDeserializer;
 +import com.google.gson.JsonElement;
 +import com.google.gson.JsonObject;
 +import com.google.gson.JsonParseException;
 +import com.google.gson.JsonParser;
 +import com.google.gson.JsonPrimitive;
 +import com.google.gson.JsonSerializationContext;
 +import com.google.gson.JsonSerializer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 +import org.apache.http.auth.BasicUserPrincipal;
 +import org.apache.kerby.util.Base64;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import javax.net.ssl.HttpsURLConnection;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.TrustManager;
 +import javax.net.ssl.X509TrustManager;
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.OutputStream;
 +import java.lang.reflect.Type;
 +import java.net.URL;
 +import java.nio.charset.StandardCharsets;
 +import java.util.Collection;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +
- import static org.apache.hadoop.ozone.om.OMConfigKeys.*;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT;
++import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT;
++import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
 +
 +/**
 + * Access controller for multi-tenancy implemented using Ranger's REST API.
 + * This class is for testing and is not intended for production use.
 + */
 +public class RangerRestMultiTenantAccessController
 +    implements MultiTenantAccessController {
 +
 +  public static final String OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT =
 +      "/service/public/v2/api/policy/";
 +  public static final String OZONE_OM_RANGER_ADMIN_ROLE_HTTP_ENDPOINT =
 +      "/service/public/v2/api/roles/";
 +
 +  private static final Logger LOG = LoggerFactory
 +      .getLogger(RangerRestMultiTenantAccessController.class);
 +
 +  private final OzoneConfiguration conf;
 +  private boolean ignoreServerCert = false;
 +  private int connectionTimeout;
 +  private int connectionRequestTimeout;
 +  private String authHeaderValue;
 +  private final String rangerHttpsAddress;
 +  private final Gson jsonConverter;
 +  private final String rangerService;
 +  private final Map<IAccessAuthorizer.ACLType, String> aclToString;
 +  private final Map<String, IAccessAuthorizer.ACLType> stringToAcl;
 +
 +  public RangerRestMultiTenantAccessController(Configuration configuration)
 +      throws IOException {
 +    conf = new OzoneConfiguration(configuration);
 +    rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
 +    rangerService = conf.get(OZONE_RANGER_SERVICE);
 +
 +    GsonBuilder gsonBuilder = new GsonBuilder();
 +    gsonBuilder.registerTypeAdapter(Policy.class, policySerializer);
 +    gsonBuilder.registerTypeAdapter(Policy.class, policyDeserializer);
 +    gsonBuilder.registerTypeAdapter(Role.class, roleSerializer);
 +    gsonBuilder.registerTypeAdapter(Role.class, roleDeserializer);
 +    gsonBuilder.registerTypeAdapter(BasicUserPrincipal.class, userSerializer);
 +    jsonConverter = gsonBuilder.create();
 +
 +    aclToString = new EnumMap<>(IAccessAuthorizer.ACLType.class);
 +    stringToAcl = new HashMap<>();
 +    fillRangerAclStrings();
 +    initializeRangerConnection();
 +  }
 +
 +  private void fillRangerAclStrings() {
 +    aclToString.put(IAccessAuthorizer.ACLType.ALL, "all");
 +    aclToString.put(IAccessAuthorizer.ACLType.LIST, "list");
 +    aclToString.put(IAccessAuthorizer.ACLType.READ, "read");
 +    aclToString.put(IAccessAuthorizer.ACLType.WRITE, "write");
 +    aclToString.put(IAccessAuthorizer.ACLType.CREATE, "create");
 +    aclToString.put(IAccessAuthorizer.ACLType.DELETE, "delete");
 +    aclToString.put(IAccessAuthorizer.ACLType.READ_ACL, "read_acl");
 +    aclToString.put(IAccessAuthorizer.ACLType.WRITE_ACL, "write_acl");
 +    aclToString.put(IAccessAuthorizer.ACLType.NONE, "");
 +
 +    stringToAcl.put("all", IAccessAuthorizer.ACLType.ALL);
 +    stringToAcl.put("list", IAccessAuthorizer.ACLType.LIST);
 +    stringToAcl.put("read", IAccessAuthorizer.ACLType.READ);
 +    stringToAcl.put("write", IAccessAuthorizer.ACLType.WRITE);
 +    stringToAcl.put("create", IAccessAuthorizer.ACLType.CREATE);
 +    stringToAcl.put("delete", IAccessAuthorizer.ACLType.DELETE);
 +    stringToAcl.put("read_acl", IAccessAuthorizer.ACLType.READ_ACL);
 +    stringToAcl.put("write_acl", IAccessAuthorizer.ACLType.WRITE_ACL);
 +    stringToAcl.put("", IAccessAuthorizer.ACLType.NONE);
 +  }
 +
 +  private void initializeRangerConnection() {
 +    setupRangerConnectionConfig();
 +    if (ignoreServerCert) {
 +      setupRangerIgnoreServerCertificate();
 +    }
 +    setupRangerConnectionAuthHeader();
 +  }
 +
 +  private void setupRangerConnectionConfig() {
 +    connectionTimeout = (int) conf.getTimeDuration(
 +        OZONE_RANGER_OM_CONNECTION_TIMEOUT,
 +        conf.get(
 +            OZONE_RANGER_OM_CONNECTION_TIMEOUT,
 +            OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT),
 +        TimeUnit.MILLISECONDS);
 +    connectionRequestTimeout = (int)conf.getTimeDuration(
 +        OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
 +        conf.get(
 +            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
 +            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
 +        TimeUnit.MILLISECONDS
 +    );
 +    ignoreServerCert = conf.getBoolean(
 +        OZONE_RANGER_OM_IGNORE_SERVER_CERT,
 +        OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT);
 +  }
 +
 +  private void setupRangerIgnoreServerCertificate() {
 +    // Create a trust manager that does not validate certificate chains
 +    TrustManager[] trustAllCerts = new TrustManager[]{
 +        new X509TrustManager() {
 +          public java.security.cert.X509Certificate[] getAcceptedIssuers() {
 +            return null;
 +          }
 +          public void checkClientTrusted(
 +              java.security.cert.X509Certificate[] certs, String authType) {
 +          }
 +          public void checkServerTrusted(
 +              java.security.cert.X509Certificate[] certs, String authType) {
 +          }
 +        }
 +    };
 +
 +    try {
 +      SSLContext sc = SSLContext.getInstance("SSL");
 +      sc.init(null, trustAllCerts, new java.security.SecureRandom());
 +      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
 +    } catch (Exception e) {
 +      LOG.info("Setting DefaultSSLSocketFactory failed.");
 +    }
 +  }
 +
 +  private void setupRangerConnectionAuthHeader() {
 +    String userName = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
 +    String passwd = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
 +    String auth = userName + ":" + passwd;
 +    byte[] encodedAuth =
 +        Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
 +    authHeaderValue = "Basic " +
 +        new String(encodedAuth, StandardCharsets.UTF_8);
 +  }
 +
 +
 +  @Override
 +  public long createPolicy(Policy policy) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT;
 +    HttpsURLConnection conn = makeHttpsPostCall(rangerAdminUrl,
 +        jsonConverter.toJsonTree(policy).getAsJsonObject());
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to create policy %s. " +
 +          "Http response code: %d", policy.getName(), 
conn.getResponseCode()));
 +    }
 +    String policyInfo = getResponseData(conn);
 +    long policyID;
 +    JsonObject jObject = new JsonParser().parse(policyInfo).getAsJsonObject();
 +    policyID = jObject.get("id").getAsLong();
 +    return policyID;
 +  }
 +
 +  @Override
 +  public void deletePolicy(long policyID) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT
 +            + policyID;
 +    HttpsURLConnection conn = makeHttpsDeleteCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to delete policy %d. " +
 +          "Http response code: %d", policyID, conn.getResponseCode()));
 +    }
 +  }
 +
 +  @Override
 +  public Map<Long, Policy> getPolicies() throws Exception {
 +    // This API gets all policies for all services. The
 +    // /public/v2/api/policies/{serviceDefName}/for-resource endpoint is
 +    // supposed to get policies for only a specified service, but it does not
 +    // seem to work. This implementation should be ok for testing purposes as
 +    // this class is intended.
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT;
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to get all policies. " +
 +          "Http response code: %d", conn.getResponseCode()));
 +    }
 +    String allPoliciesString = getResponseData(conn);
 +    // Filter out policies not for Ozone service.
 +    JsonArray jsonPoliciesArray = new JsonParser().parse(allPoliciesString)
 +        .getAsJsonArray();
 +    Map<Long, Policy> policies = new HashMap<>();
 +    for (JsonElement jsonPolicy: jsonPoliciesArray) {
 +      JsonObject jsonPolicyObject = jsonPolicy.getAsJsonObject();
 +      String service = jsonPolicyObject.get("service").getAsString();
 +      if (service.equals(rangerService)) {
 +        long id = jsonPolicyObject.get("id").getAsLong();
 +        policies.put(id, jsonConverter.fromJson(jsonPolicyObject,
 +            Policy.class));
 +      }
 +    }
 +
 +    return policies;
 +  }
 +
 +  @Override
 +  public Policy getPolicy(long policyID) throws IOException {
 +    String rangerAdminUrl = rangerHttpsAddress +
 +        OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT + policyID;
 +
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to get policy %d. " +
 +          "Http response code: %d", policyID, conn.getResponseCode()));
 +    }
 +    String policyInfo = getResponseData(conn);
 +    return jsonConverter.fromJson(policyInfo, Policy.class);
 +  }
 +
 +  @Override
 +  public void updatePolicy(long policyID, Policy policy) throws IOException {
 +    String rangerAdminUrl = rangerHttpsAddress +
 +        OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT + policyID;
 +
 +    HttpsURLConnection conn = makeHttpsPutCall(rangerAdminUrl,
 +        jsonConverter.toJsonTree(policy));
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to update policy %d. " +
 +          "Http response code: %d", policyID, conn.getResponseCode()));
 +    }
 +  }
 +
 +  @Override
 +  public long createRole(Role role) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_ROLE_HTTP_ENDPOINT;
 +
 +    HttpsURLConnection conn = makeHttpsPostCall(rangerAdminUrl,
 +        jsonConverter.toJsonTree(role).getAsJsonObject());
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to create role %s. " +
 +          "Http response code: %d", role.getName(), conn.getResponseCode()));
 +    }
 +    String responseString = getResponseData(conn);
 +    JsonObject jObject = new JsonParser().parse(responseString)
 +        .getAsJsonObject();
 +    return jObject.get("id").getAsLong();
 +  }
 +
 +  @Override
 +  public void deleteRole(long roleID) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_POLICY_HTTP_ENDPOINT
 +            + roleID;
 +    HttpsURLConnection conn = makeHttpsDeleteCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to delete role %d. " +
 +          "Http response code: %d", roleID, conn.getResponseCode()));
 +    }
 +  }
 +
 +  @Override
 +  public Map<Long, Role> getRoles() throws Exception {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_ROLE_HTTP_ENDPOINT;
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to get all roles. " +
 +          "Http response code: %d", conn.getResponseCode()));
 +    }
 +
 +    String allRolesString = getResponseData(conn);
 +    JsonArray rolesArrayJson =
 +        new JsonParser().parse(allRolesString).getAsJsonArray();
 +    Map<Long, Role> roles = new HashMap<>();
 +    for (JsonElement roleJson: rolesArrayJson) {
 +      long id = roleJson.getAsJsonObject().get("id").getAsLong();
 +      roles.put(id, jsonConverter.fromJson(roleJson, Role.class));
 +    }
 +
 +    return roles;
 +  }
 +
 +  @Override
 +  public Role getRole(long roleID) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_ROLE_HTTP_ENDPOINT + 
roleID;
 +
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to get role %d. " +
 +          "Http response code: %d", roleID, conn.getResponseCode()));
 +    }
 +    String roleInfo = getResponseData(conn);
 +    return jsonConverter.fromJson(roleInfo, Role.class);
 +  }
 +
 +  @Override
 +  public void updateRole(long roleID, Role role) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_ROLE_HTTP_ENDPOINT + 
roleID;
 +
 +    HttpsURLConnection conn = makeHttpsPutCall(rangerAdminUrl,
 +        jsonConverter.toJsonTree(role));
 +    if (!successfulResponseCode(conn.getResponseCode())) {
 +      throw new IOException(String.format("Failed to update role %d. " +
 +          "Http response code: %d", roleID, conn.getResponseCode()));
 +    }
 +  }
 +
 +  private HttpsURLConnection makeHttpsPutCall(String url, JsonElement content)
 +      throws IOException {
 +    HttpsURLConnection connection = makeBaseHttpsURLConnection(url);
 +    connection.setRequestMethod("PUT");
 +    return addJsonContentToConnection(connection, content);
 +  }
 +
 +  private HttpsURLConnection makeHttpsPostCall(String url, JsonElement 
content)
 +      throws IOException {
 +    HttpsURLConnection connection = makeBaseHttpsURLConnection(url);
 +    connection.setRequestMethod("POST");
 +    return addJsonContentToConnection(connection, content);
 +  }
 +
 +  private HttpsURLConnection addJsonContentToConnection(
 +      HttpsURLConnection connection, JsonElement content) throws IOException {
 +    connection.setDoOutput(true);
 +    connection.setRequestProperty("Content-Type", "application/json;");
 +    try (OutputStream os = connection.getOutputStream()) {
 +      byte[] input = content.toString().getBytes(StandardCharsets.UTF_8);
 +      os.write(input, 0, input.length);
 +      os.flush();
 +    }
 +
 +    return connection;
 +  }
 +
 +  private HttpsURLConnection makeHttpsGetCall(String urlString)
 +      throws IOException {
 +    HttpsURLConnection connection = makeBaseHttpsURLConnection(urlString);
 +    connection.setRequestMethod("GET");
 +    return connection;
 +  }
 +
 +  private HttpsURLConnection makeHttpsDeleteCall(String urlString)
 +      throws IOException {
 +    HttpsURLConnection connection = makeBaseHttpsURLConnection(urlString);
 +    connection.setRequestMethod("DELETE");
 +    return connection;
 +  }
 +
 +  private HttpsURLConnection makeBaseHttpsURLConnection(String urlString)
 +      throws IOException {
 +    URL url = new URL(urlString);
 +    HttpsURLConnection urlConnection = 
(HttpsURLConnection)url.openConnection();
 +    urlConnection.setConnectTimeout(connectionTimeout);
 +    urlConnection.setReadTimeout(connectionRequestTimeout);
 +    urlConnection.setRequestProperty("Accept", "application/json");
 +    urlConnection.setRequestProperty("Authorization", authHeaderValue);
 +
 +    return urlConnection;
 +  }
 +
 +  private String getResponseData(HttpsURLConnection urlConnection)
 +      throws IOException {
 +    StringBuilder response = new StringBuilder();
 +    try (BufferedReader br = new BufferedReader(
 +        new InputStreamReader(
 +            urlConnection.getInputStream(), StandardCharsets.UTF_8))) {
 +      String responseLine;
 +      while ((responseLine = br.readLine()) != null) {
 +        response.append(responseLine.trim());
 +      }
 +    }
 +    return response.toString();
 +  }
 +
 +  private boolean successfulResponseCode(long responseCode) {
 +    return responseCode >= 200 && responseCode < 300;
 +  }
 +
 +  /// SERIALIZATION ///
 +
 +  private final JsonDeserializer<Policy> policyDeserializer =
 +      new JsonDeserializer<Policy>() {
-     @Override
-     public Policy deserialize(JsonElement jsonElement, Type type,
-         JsonDeserializationContext jsonDeserializationContext)
-         throws JsonParseException {
-       JsonObject policyJson = jsonElement.getAsJsonObject();
-       String name = policyJson.get("name").getAsString();
-       Policy policy = new Policy(name);
-       if (policyJson.has("description")) {
-         policy.setDescription(policyJson.get("description").getAsString());
-       }
-       policy.setEnabled(policyJson.get("isEnabled").getAsBoolean());
- 
-       // Read volume, bucket, keys from json.
-       JsonObject resourcesJson = 
policyJson.get("resources").getAsJsonObject();
-       // All Ozone Ranger policies specify at least a volume.
-       JsonObject jsonVolumeResource =
-           resourcesJson.get("volume").getAsJsonObject();
-       JsonArray volumes = jsonVolumeResource.get("values").getAsJsonArray();
-       volumes.forEach(vol -> policy.addVolumes(vol.getAsString()));
- 
-       if (resourcesJson.has("bucket")) {
-         JsonObject jsonBucketResource =
-             resourcesJson.get("bucket").getAsJsonObject();
-         JsonArray buckets = jsonBucketResource.get("values").getAsJsonArray();
-         buckets.forEach(bucket -> policy.addBuckets(bucket.getAsString()));
-       }
- 
-       if (resourcesJson.has("key")) {
-         JsonObject jsonKeysResource =
-             resourcesJson.get("key").getAsJsonObject();
-         JsonArray keys = jsonKeysResource.get("values").getAsJsonArray();
-         keys.forEach(key -> policy.addKeys(key.getAsString()));
-       }
++        @Override public Policy deserialize(JsonElement jsonElement, Type 
type,
++            JsonDeserializationContext jsonDeserializationContext)
++            throws JsonParseException {
++          JsonObject policyJson = jsonElement.getAsJsonObject();
++          String name = policyJson.get("name").getAsString();
++          Policy policy = new Policy(name);
++          if (policyJson.has("description")) {
++            
policy.setDescription(policyJson.get("description").getAsString());
++          }
++          policy.setEnabled(policyJson.get("isEnabled").getAsBoolean());
++
++          // Read volume, bucket, keys from json.
++          JsonObject resourcesJson =
++              policyJson.get("resources").getAsJsonObject();
++          // All Ozone Ranger policies specify at least a volume.
++          JsonObject jsonVolumeResource =
++              resourcesJson.get("volume").getAsJsonObject();
++          JsonArray volumes = 
jsonVolumeResource.get("values").getAsJsonArray();
++          volumes.forEach(vol -> policy.addVolumes(vol.getAsString()));
++
++          if (resourcesJson.has("bucket")) {
++            JsonObject jsonBucketResource =
++                resourcesJson.get("bucket").getAsJsonObject();
++            JsonArray buckets =
++                jsonBucketResource.get("values").getAsJsonArray();
++            buckets.forEach(bucket -> 
policy.addBuckets(bucket.getAsString()));
++          }
 +
-       // Read Roles and their ACLs.
-       JsonArray policyItemsJson = policyJson.getAsJsonArray("policyItems");
-       for (JsonElement policyItemElement: policyItemsJson) {
-         JsonObject policyItemJson = policyItemElement.getAsJsonObject();
-         JsonArray jsonRoles = policyItemJson.getAsJsonArray("roles");
-         JsonArray jsonAclArray = policyItemJson.getAsJsonArray("accesses");
- 
-         for (JsonElement jsonAclElem: jsonAclArray) {
-           JsonObject jsonAcl = jsonAclElem.getAsJsonObject();
-           String aclType = jsonAcl.get("type").getAsString();
-           Acl acl;
-           if (jsonAcl.get("isAllowed").getAsBoolean()) {
-             acl = Acl.allow(stringToAcl.get(aclType));
-           } else {
-             acl = Acl.deny(stringToAcl.get(aclType));
++          if (resourcesJson.has("key")) {
++            JsonObject jsonKeysResource =
++                resourcesJson.get("key").getAsJsonObject();
++            JsonArray keys = jsonKeysResource.get("values").getAsJsonArray();
++            keys.forEach(key -> policy.addKeys(key.getAsString()));
 +          }
 +
-           for (JsonElement roleNameJson: jsonRoles) {
-             policy.addRoleAcls(roleNameJson.getAsString(), acl);
++          // Read Roles and their ACLs.
++          JsonArray policyItemsJson = 
policyJson.getAsJsonArray("policyItems");
++          for (JsonElement policyItemElement : policyItemsJson) {
++            JsonObject policyItemJson = policyItemElement.getAsJsonObject();
++            JsonArray jsonRoles = policyItemJson.getAsJsonArray("roles");
++            JsonArray jsonAclArray = 
policyItemJson.getAsJsonArray("accesses");
++
++            for (JsonElement jsonAclElem : jsonAclArray) {
++              JsonObject jsonAcl = jsonAclElem.getAsJsonObject();
++              String aclType = jsonAcl.get("type").getAsString();
++              Acl acl;
++              if (jsonAcl.get("isAllowed").getAsBoolean()) {
++                acl = Acl.allow(stringToAcl.get(aclType));
++              } else {
++                acl = Acl.deny(stringToAcl.get(aclType));
++              }
++
++              for (JsonElement roleNameJson : jsonRoles) {
++                policy.addRoleAcls(roleNameJson.getAsString(), acl);
++              }
++            }
 +          }
-         }
-       }
 +
-       return policy;
-     }
-   };
++          return policy;
++        }
++      };
 +
 +  private final JsonDeserializer<Role> roleDeserializer =
 +      new JsonDeserializer<Role>() {
-     @Override
-     public Role deserialize(JsonElement jsonElement, Type type,
-         JsonDeserializationContext jsonDeserializationContext)
-         throws JsonParseException {
-       JsonObject roleJson = jsonElement.getAsJsonObject();
-       String name = roleJson.get("name").getAsString();
-       Role role = new Role(name);
-       if (roleJson.has("description")) {
-         role.setDescription(roleJson.get("description").getAsString());
-       }
-       for (JsonElement jsonUser: roleJson.get("users").getAsJsonArray()) {
-         String userName =
-             jsonUser.getAsJsonObject().get("name").getAsString();
-         role.addUsers(new BasicUserPrincipal(userName));
-       }
++        @Override public Role deserialize(JsonElement jsonElement, Type type,
++            JsonDeserializationContext jsonDeserializationContext)
++            throws JsonParseException {
++          JsonObject roleJson = jsonElement.getAsJsonObject();
++          String name = roleJson.get("name").getAsString();
++          Role role = new Role(name);
++          if (roleJson.has("description")) {
++            role.setDescription(roleJson.get("description").getAsString());
++          }
++          for (JsonElement jsonUser : roleJson.get("users").getAsJsonArray()) 
{
++            String userName =
++                jsonUser.getAsJsonObject().get("name").getAsString();
++            role.addUsers(new BasicUserPrincipal(userName));
++          }
 +
-       return role;
-     }
-   };
++          return role;
++        }
++      };
 +
 +  private final JsonSerializer<Policy> policySerializer =
 +      new JsonSerializer<Policy>() {
-     @Override
-     public JsonElement serialize(Policy javaPolicy, Type typeOfSrc,
-         JsonSerializationContext context) {
-       JsonObject jsonPolicy = new JsonObject();
-       jsonPolicy.addProperty("name", javaPolicy.getName());
-       jsonPolicy.addProperty("service", rangerService);
-       jsonPolicy.addProperty("isEnabled", javaPolicy.isEnabled());
-       if (javaPolicy.getDescription().isPresent()) {
-         jsonPolicy.addProperty("description",
-             javaPolicy.getDescription().get());
-       }
++        @Override public JsonElement serialize(Policy javaPolicy,
++            Type typeOfSrc, JsonSerializationContext context) {
++          JsonObject jsonPolicy = new JsonObject();
++          jsonPolicy.addProperty("name", javaPolicy.getName());
++          jsonPolicy.addProperty("service", rangerService);
++          jsonPolicy.addProperty("isEnabled", javaPolicy.isEnabled());
++          if (javaPolicy.getDescription().isPresent()) {
++            jsonPolicy.addProperty("description",
++                javaPolicy.getDescription().get());
++          }
 +
-       // All resources under this policy are added to this object.
-       JsonObject jsonResources = new JsonObject();
++          // All resources under this policy are added to this object.
++          JsonObject jsonResources = new JsonObject();
 +
-       // Add volumes. Ranger requires at least one volume to be specified.
-       JsonArray jsonVolumeNameArray = new JsonArray();
-       for (String volumeName: javaPolicy.getVolumes()) {
-         jsonVolumeNameArray.add(new JsonPrimitive(volumeName));
-       }
-       JsonObject jsonVolumeResource = new JsonObject();
-       jsonVolumeResource.add("values", jsonVolumeNameArray);
-       jsonVolumeResource.addProperty("isRecursive", false);
-       jsonVolumeResource.addProperty("isExcludes", false);
-       jsonResources.add("volume", jsonVolumeResource);
- 
-       // Add buckets.
-       JsonArray jsonBucketNameArray = new JsonArray();
-       for (String bucketName: javaPolicy.getBuckets()) {
-         jsonBucketNameArray.add(new JsonPrimitive(bucketName));
-       }
++          // Add volumes. Ranger requires at least one volume to be specified.
++          JsonArray jsonVolumeNameArray = new JsonArray();
++          for (String volumeName : javaPolicy.getVolumes()) {
++            jsonVolumeNameArray.add(new JsonPrimitive(volumeName));
++          }
++          JsonObject jsonVolumeResource = new JsonObject();
++          jsonVolumeResource.add("values", jsonVolumeNameArray);
++          jsonVolumeResource.addProperty("isRecursive", false);
++          jsonVolumeResource.addProperty("isExcludes", false);
++          jsonResources.add("volume", jsonVolumeResource);
++
++          // Add buckets.
++          JsonArray jsonBucketNameArray = new JsonArray();
++          for (String bucketName : javaPolicy.getBuckets()) {
++            jsonBucketNameArray.add(new JsonPrimitive(bucketName));
++          }
 +
-       if (jsonBucketNameArray.size() > 0) {
-         JsonObject jsonBucketResource = new JsonObject();
-         jsonBucketResource.add("values", jsonBucketNameArray);
-         jsonBucketResource.addProperty("isRecursive", false);
-         jsonBucketResource.addProperty("isExcludes", false);
-         jsonResources.add("bucket", jsonBucketResource);
-       }
++          if (jsonBucketNameArray.size() > 0) {
++            JsonObject jsonBucketResource = new JsonObject();
++            jsonBucketResource.add("values", jsonBucketNameArray);
++            jsonBucketResource.addProperty("isRecursive", false);
++            jsonBucketResource.addProperty("isExcludes", false);
++            jsonResources.add("bucket", jsonBucketResource);
++          }
 +
-       // Add keys.
-       JsonArray jsonKeyNameArray = new JsonArray();
-       for (String keyName: javaPolicy.getKeys()) {
-         jsonKeyNameArray.add(new JsonPrimitive(keyName));
-       }
-       if (jsonKeyNameArray.size() > 0) {
-         JsonObject jsonKeyResource = new JsonObject();
-         jsonKeyResource.add("values", jsonKeyNameArray);
-         jsonKeyResource.addProperty("isRecursive", false);
-         jsonKeyResource.addProperty("isExcludes", false);
-         jsonResources.add("key", jsonKeyResource);
-       }
++          // Add keys.
++          JsonArray jsonKeyNameArray = new JsonArray();
++          for (String keyName : javaPolicy.getKeys()) {
++            jsonKeyNameArray.add(new JsonPrimitive(keyName));
++          }
++          if (jsonKeyNameArray.size() > 0) {
++            JsonObject jsonKeyResource = new JsonObject();
++            jsonKeyResource.add("values", jsonKeyNameArray);
++            jsonKeyResource.addProperty("isRecursive", false);
++            jsonKeyResource.addProperty("isExcludes", false);
++            jsonResources.add("key", jsonKeyResource);
++          }
 +
-       jsonPolicy.add("resources", jsonResources);
- 
-       // Add roles and their acls to the policy.
-       JsonArray jsonPolicyItemArray = new JsonArray();
- 
-       // Make a new policy item for each role in the map.
-       Map<String, Collection<Acl>> roleAcls = javaPolicy.getRoleAcls();
-       for (Map.Entry<String, Collection<Acl>> entry: roleAcls.entrySet()) {
-         // Add role to the policy item.
-         String roleName = entry.getKey();
-         JsonObject jsonPolicyItem = new JsonObject();
-         JsonArray jsonRoles = new JsonArray();
-         jsonRoles.add(new JsonPrimitive(roleName));
-         jsonPolicyItem.add("roles", jsonRoles);
- 
-         // Add acls to the policy item.
-         JsonArray jsonAclArray = new JsonArray();
-         for (Acl acl: entry.getValue()) {
-           JsonObject jsonAcl  = new JsonObject();
-           jsonAcl.addProperty("type",
-               aclToString.get(acl.getAclType()));
-           jsonAcl.addProperty("isAllowed", acl.isAllowed());
-           jsonAclArray.add(jsonAcl);
-           jsonPolicyItem.add("accesses", jsonAclArray);
-         }
-         jsonPolicyItemArray.add(jsonPolicyItem);
-       }
-       jsonPolicy.add("policyItems", jsonPolicyItemArray);
++          jsonPolicy.add("resources", jsonResources);
++
++          // Add roles and their acls to the policy.
++          JsonArray jsonPolicyItemArray = new JsonArray();
++
++          // Make a new policy item for each role in the map.
++          Map<String, Collection<Acl>> roleAcls = javaPolicy.getRoleAcls();
++          for (Map.Entry<String, Collection<Acl>> entry : 
roleAcls.entrySet()) {
++            // Add role to the policy item.
++            String roleName = entry.getKey();
++            JsonObject jsonPolicyItem = new JsonObject();
++            JsonArray jsonRoles = new JsonArray();
++            jsonRoles.add(new JsonPrimitive(roleName));
++            jsonPolicyItem.add("roles", jsonRoles);
++
++            // Add acls to the policy item.
++            JsonArray jsonAclArray = new JsonArray();
++            for (Acl acl : entry.getValue()) {
++              JsonObject jsonAcl = new JsonObject();
++              jsonAcl.addProperty("type", aclToString.get(acl.getAclType()));
++              jsonAcl.addProperty("isAllowed", acl.isAllowed());
++              jsonAclArray.add(jsonAcl);
++              jsonPolicyItem.add("accesses", jsonAclArray);
++            }
++            jsonPolicyItemArray.add(jsonPolicyItem);
++          }
++          jsonPolicy.add("policyItems", jsonPolicyItemArray);
 +
-       return jsonPolicy;
-     }
-   };
++          return jsonPolicy;
++        }
++      };
 +
 +  private final JsonSerializer<Role> roleSerializer =
 +      new JsonSerializer<Role>() {
-     @Override
-     public JsonElement serialize(Role javaRole, Type typeOfSrc,
-         JsonSerializationContext context) {
-       JsonObject jsonRole = new JsonObject();
-       jsonRole.addProperty("name", javaRole.getName());
- 
-       JsonArray jsonUserArray = new JsonArray();
-       for (BasicUserPrincipal javaUser: javaRole.getUsers()) {
-         jsonUserArray.add(jsonConverter.toJsonTree(javaUser));
-       }
++        @Override public JsonElement serialize(Role javaRole, Type typeOfSrc,
++            JsonSerializationContext context) {
++          JsonObject jsonRole = new JsonObject();
++          jsonRole.addProperty("name", javaRole.getName());
++
++          JsonArray jsonUserArray = new JsonArray();
++          for (BasicUserPrincipal javaUser : javaRole.getUsers()) {
++            jsonUserArray.add(jsonConverter.toJsonTree(javaUser));
++          }
 +
-       jsonRole.add("users", jsonUserArray);
-       return jsonRole;
-     }
-   };
++          jsonRole.add("users", jsonUserArray);
++          return jsonRole;
++        }
++      };
 +
 +  private final JsonSerializer<BasicUserPrincipal> userSerializer =
 +      new JsonSerializer<BasicUserPrincipal>() {
-     @Override
-     public JsonElement serialize(BasicUserPrincipal user, Type typeOfSrc,
-                                  JsonSerializationContext context) {
-         JsonObject jsonMember = new JsonObject();
-         jsonMember.addProperty("name", user.getName());
-         jsonMember.addProperty("isAdmin", false);
-         return jsonMember;
-     }
-   };
++        @Override public JsonElement serialize(BasicUserPrincipal user,
++            Type typeOfSrc, JsonSerializationContext context) {
++          JsonObject jsonMember = new JsonObject();
++          jsonMember.addProperty("name", user.getName());
++          jsonMember.addProperty("isAdmin", false);
++          return jsonMember;
++        }
++      };
 +}
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 7a48f28917,29d5a09f68..6248f27be0
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@@ -86,6 -80,6 +87,8 @@@ import org.apache.hadoop.ozone.protocol
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressResponse;
@@@ -98,6 -92,6 +101,8 @@@ import org.apache.hadoop.ozone.protocol
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretResponse;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3VolumeContextRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3VolumeContextResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeRequest;
@@@ -144,18 -136,12 +149,24 @@@ import org.apache.hadoop.ozone.protocol
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeysRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RevokeS3SecretRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Secret;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetS3SecretRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetS3SecretResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignAdminRequest;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignUserAccessIdRequest;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignUserAccessIdResponse;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantGetUserInfoRequest;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantGetUserInfoResponse;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantListUserRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantListUserResponse;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantRevokeAdminRequest;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantRevokeUserAccessIdRequest;
  import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
  import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
diff --cc hadoop-ozone/dist/src/main/smoketest/upgrade/generate.robot
index 7493c65ea9,f6de86add8..acecbab5f8
--- a/hadoop-ozone/dist/src/main/smoketest/upgrade/generate.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/upgrade/generate.robot
@@@ -30,30 -30,7 +31,35 @@@ Create a volume and bucke
                          Should not contain  ${output}       Failed
      ${output} =         Execute          ozone sh bucket create 
/${PREFIX}-volume/${PREFIX}-bucket
                          Should not contain  ${output}       Failed
+ 
+ Create key
 -    ${output} =         Execute          ozone sh key put 
/${PREFIX}-volume/${PREFIX}-bucket/${PREFIX}-key /opt/hadoop/NOTICE.txt
 +                        Execute and checkrc    echo "${PREFIX}: key created 
using Ozone Shell" > /tmp/sourcekey    0
 +    ${output} =         Execute          ozone sh key put 
/${PREFIX}-volume/${PREFIX}-bucket/${PREFIX}-key /tmp/sourcekey
                          Should not contain  ${output}       Failed
 +                        Execute and checkrc    rm /tmp/sourcekey    0
 +
- Create a bucket and key in volume s3v
++Create a bucket in s3v volume
++    [Tags]    create-volume-and-bucket
 +    ${output} =         Execute          ozone sh bucket create 
/s3v/${PREFIX}-bucket
 +                        Should not contain  ${output}       Failed
++
++Create key in the bucket in s3v volume
 +                        Execute and checkrc    echo "${PREFIX}: another key 
created using Ozone Shell" > /tmp/sourcekey    0
 +    ${output} =         Execute          ozone sh key put 
/s3v/${PREFIX}-bucket/key1-shell /tmp/sourcekey
 +                        Should not contain  ${output}       Failed
 +                        Execute and checkrc    rm /tmp/sourcekey    0
 +
 +Setup credentials for S3
 +    # TODO: Run "Setup secure v4 headers" instead when security is enabled
 +    Run Keyword         Setup dummy credentials for S3
 +
 +Try to create a bucket using S3 API
 +    # Note: S3 API does not return error if the bucket already exists
 +    ${output} =         Create bucket with name    ${PREFIX}-bucket
 +                        Should Be Equal    ${output}    ${None}
 +
 +Create key using S3 API
 +                        Execute and checkrc    echo "${PREFIX}: key created 
using S3 API" > /tmp/sourcekey    0
 +    ${output} =         Execute AWSS3APICli and checkrc    put-object 
--bucket ${PREFIX}-bucket --key key2-s3api --body /tmp/sourcekey    0
 +                        Should not contain    ${output}    error
 +                        Execute and checkrc    rm /tmp/sourcekey    0
diff --cc hadoop-ozone/dist/src/shell/ozone/ozone
index 12a3997675,72be8cfb45..8a0d81bfa7
--- a/hadoop-ozone/dist/src/shell/ozone/ozone
+++ b/hadoop-ozone/dist/src/shell/ozone/ozone
@@@ -168,12 -167,9 +168,13 @@@ function ozonecmd_cas
      s3g)
        OZONE_SUBCMD_SUPPORTDAEMONIZATION="true"
        OZONE_CLASSNAME='org.apache.hadoop.ozone.s3.Gateway'
+       OZONE_S3G_OPTS="${OZONE_S3G_OPTS} 
-Dlog4j.configurationFile=${OZONE_CONF_DIR}/s3g-audit-log4j2.properties"
        OZONE_RUN_ARTIFACT_NAME="ozone-s3gateway"
      ;;
 +    tenant)
 +      OZONE_CLASSNAME=org.apache.hadoop.ozone.shell.tenant.TenantShell
 +      OZONE_RUN_ARTIFACT_NAME="ozone-tools"
 +    ;;
      csi)
        OZONE_SUBCMD_SUPPORTDAEMONIZATION="true"
        OZONE_CLASSNAME='org.apache.hadoop.ozone.csi.CsiServer'
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 6c20625946,ef561e657b..15eda5a4b9
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -68,15 -68,11 +68,15 @@@ public class TestOzoneConfigurationFiel
      configurationPropsToSkipCompare
          .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
      // This property is tested in TestHttpServer2 instead
      xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
 +
 +    // TODO: Remove this once ranger configs are finalized in HDDS-5836
 +    configurationPrefixToSkipCompare.add("ozone.om.ranger");
 +
      addPropertiesNotInXml();
    }
  
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
index a8aabc0593,0000000000..138ad57470
mode 100644,000000..100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
@@@ -1,244 -1,0 +1,244 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.om.multitenant;
 +
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 +import org.apache.hadoop.ozone.MiniOzoneCluster;
 +import org.apache.hadoop.ozone.client.ObjectStore;
 +import org.apache.hadoop.ozone.client.OzoneBucket;
 +import org.apache.hadoop.ozone.client.OzoneVolume;
 +import org.apache.hadoop.ozone.client.rpc.RpcClient;
 +import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 +import org.apache.hadoop.ozone.om.protocol.S3Auth;
 +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 +import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.apache.ozone.test.LambdaTestUtils;
 +import org.apache.ozone.test.LambdaTestUtils.VoidCallable;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +import java.util.UUID;
 +import java.util.concurrent.TimeoutException;
 +
 +import static 
org.apache.hadoop.ozone.admin.scm.FinalizeUpgradeCommandUtil.isDone;
 +import static 
org.apache.hadoop.ozone.admin.scm.FinalizeUpgradeCommandUtil.isStarting;
 +
 +/**
 + * Tests that S3 requests for a tenant are directed to that tenant's volume,
 + * and that users not belonging to a tenant are directed to the default S3
 + * volume.
 + */
 +public class TestMultiTenantVolume {
 +  private static MiniOzoneCluster cluster;
 +  private static String s3VolumeName;
 +
 +  private static final String TENANT_ID = "tenant";
 +  private static final String USER_PRINCIPAL = "username";
 +  private static final String BUCKET_NAME = "bucket";
 +  private static final String ACCESS_ID = "tenant$username";
 +
 +  @BeforeClass
 +  public static void initClusterProvider() throws Exception {
 +    OzoneConfiguration conf = new OzoneConfiguration();
 +    conf.setBoolean(
 +        OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER, true);
 +    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
 +        .withoutDatanodes()
 +        .setOmLayoutVersion(OMLayoutFeature.INITIAL_VERSION.layoutVersion());
 +    cluster = builder.build();
 +    s3VolumeName = HddsClientUtils.getDefaultS3VolumeName(conf);
 +
 +    preFinalizationChecks(getStoreForAccessID(ACCESS_ID));
 +    finalizeOMUpgrade();
 +  }
 +
 +  @AfterClass
 +  public static void shutdownClusterProvider() {
 +    cluster.shutdown();
 +  }
 +
 +  private static void expectFailurePreFinalization(VoidCallable eval)
 +      throws Exception {
 +    LambdaTestUtils.intercept(OMException.class,
 +        "cannot be invoked before finalization", eval);
 +  }
 +
 +  /**
 +   * Perform sanity checks before triggering upgrade finalization.
 +   */
 +  private static void preFinalizationChecks(ObjectStore store)
 +      throws Exception {
 +
 +    // None of the tenant APIs is usable before the upgrade finalization step
 +    expectFailurePreFinalization(
 +        store::listTenant);
 +    expectFailurePreFinalization(() ->
 +        store.listUsersInTenant(TENANT_ID, ""));
 +    expectFailurePreFinalization(() ->
 +        store.tenantGetUserInfo(USER_PRINCIPAL));
 +    expectFailurePreFinalization(() ->
 +        store.createTenant(TENANT_ID));
 +    expectFailurePreFinalization(() ->
 +        store.tenantAssignUserAccessId(USER_PRINCIPAL, TENANT_ID, ACCESS_ID));
 +    expectFailurePreFinalization(() ->
 +        store.tenantAssignAdmin(USER_PRINCIPAL, TENANT_ID, true));
 +    expectFailurePreFinalization(() ->
 +        store.tenantRevokeAdmin(ACCESS_ID, TENANT_ID));
 +    expectFailurePreFinalization(() ->
 +        store.tenantRevokeUserAccessId(ACCESS_ID));
 +    expectFailurePreFinalization(() ->
 +        store.deleteTenant(TENANT_ID));
 +
 +    // S3 get/set/revoke secret APIs still work before finalization
 +    final String accessId = "testUser1accessId1";
 +    S3SecretValue s3SecretValue = store.getS3Secret(accessId);
 +    Assert.assertEquals(accessId, s3SecretValue.getAwsAccessKey());
 +    final String setSecret = "testsecret";
 +    s3SecretValue = store.setS3Secret(accessId, setSecret);
 +    Assert.assertEquals(accessId, s3SecretValue.getAwsAccessKey());
 +    Assert.assertEquals(setSecret, s3SecretValue.getAwsSecret());
 +    store.revokeS3Secret(accessId);
 +  }
 +
 +  /**
 +   * Trigger OM upgrade finalization from the client and block until 
completion
 +   * (status FINALIZATION_DONE).
 +   */
 +  private static void finalizeOMUpgrade()
 +      throws IOException, InterruptedException, TimeoutException {
 +
 +    // Trigger OM upgrade finalization. Ref: FinalizeUpgradeSubCommand#call
 +    final OzoneManagerProtocol client = 
cluster.getRpcClient().getObjectStore()
 +        .getClientProxy().getOzoneManagerClient();
 +    final String upgradeClientID = "Test-Upgrade-Client-" + UUID.randomUUID();
 +    UpgradeFinalizer.StatusAndMessages finalizationResponse =
 +        client.finalizeUpgrade(upgradeClientID);
 +
 +    // The status should transition as soon as the client call above returns
 +    Assert.assertTrue(isStarting(finalizationResponse.status()));
 +
 +    // Wait for the finalization to be marked as done.
 +    // 10s timeout should be plenty.
 +    GenericTestUtils.waitFor(() -> {
 +      try {
 +        final UpgradeFinalizer.StatusAndMessages progress =
 +            client.queryUpgradeFinalizationProgress(
 +                upgradeClientID, false, false);
 +        return isDone(progress.status());
 +      } catch (IOException e) {
 +        Assert.fail("Unexpected exception while waiting for "
 +            + "the OM upgrade to finalize: " + e.getMessage());
 +      }
 +      return false;
 +    }, 500, 10000);
 +  }
 +
 +  @Test
 +  public void testDefaultS3Volume() throws Exception {
 +    final String bucketName = "bucket";
 +
 +    // Default client not belonging to a tenant should end up in the S3 
volume.
 +    ObjectStore store = cluster.getClient().getObjectStore();
 +    Assert.assertEquals(s3VolumeName, store.getS3Volume().getName());
 +
 +    // Create bucket.
 +    store.createS3Bucket(bucketName);
 +    Assert.assertEquals(s3VolumeName,
 +        store.getS3Bucket(bucketName).getVolumeName());
 +
 +    // Delete bucket.
 +    store.deleteS3Bucket(bucketName);
 +    assertS3BucketNotFound(store, bucketName);
 +  }
 +
 +  @Test
 +  public void testS3TenantVolume() throws Exception {
 +
 +    ObjectStore store = getStoreForAccessID(ACCESS_ID);
 +
 +    store.createTenant(TENANT_ID);
 +    store.tenantAssignUserAccessId(USER_PRINCIPAL, TENANT_ID, ACCESS_ID);
 +
 +    // S3 volume pointed to by the store should be for the tenant.
 +    Assert.assertEquals(TENANT_ID, store.getS3Volume().getName());
 +
 +    // Create bucket in the tenant volume.
 +    store.createS3Bucket(BUCKET_NAME);
 +    OzoneBucket bucket = store.getS3Bucket(BUCKET_NAME);
 +    Assert.assertEquals(TENANT_ID, bucket.getVolumeName());
 +
 +    // A different user should not see bucket, since they will be directed to
 +    // the s3 volume.
 +    ObjectStore store2 = getStoreForAccessID(UUID.randomUUID().toString());
 +    assertS3BucketNotFound(store2, BUCKET_NAME);
 +
 +    // Delete bucket.
 +    store.deleteS3Bucket(BUCKET_NAME);
 +    assertS3BucketNotFound(store, BUCKET_NAME);
 +
 +    store.tenantRevokeUserAccessId(ACCESS_ID);
 +    store.deleteTenant(TENANT_ID);
 +  }
 +
 +  /**
 +   * Checks that the bucket is not found using
 +   * {@link ObjectStore#getS3Bucket} and the designated S3 volume pointed to
 +   * by the ObjectStore.
 +   */
 +  private void assertS3BucketNotFound(ObjectStore store, String bucketName)
 +      throws IOException {
 +    try {
 +      store.getS3Bucket(bucketName);
 +    } catch (OMException ex) {
 +      if (ex.getResult() != OMException.ResultCodes.BUCKET_NOT_FOUND) {
 +        throw ex;
 +      }
 +    }
 +
 +    try {
 +      OzoneVolume volume = store.getS3Volume();
 +      volume.getBucket(bucketName);
 +    } catch (OMException ex) {
 +      if (ex.getResult() != OMException.ResultCodes.BUCKET_NOT_FOUND) {
 +        throw ex;
 +      }
 +    }
 +  }
 +
 +  private static ObjectStore getStoreForAccessID(String accessID)
 +      throws IOException {
 +    // Cluster provider will modify our provided configuration. We must use
 +    // this version to build the client.
 +    OzoneConfiguration conf = cluster.getOzoneManager().getConfiguration();
 +    // Manually construct an object store instead of using the cluster
 +    // provided one so we can specify the access ID.
 +    RpcClient client = new RpcClient(conf, null);
 +    // userPrincipal is set to be the same as accessId for the test
-     client.setTheadLocalS3Auth(
++    client.setThreadLocalS3Auth(
 +        new S3Auth("unused1", "unused2", accessID, accessID));
 +    return new ObjectStore(conf, client);
 +  }
 +}
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 83708c20d0,1a0e7d461c..445218f530
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@@ -67,9 -67,7 +67,10 @@@ import org.apache.hadoop.hdds.protocol.
  import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
  import org.apache.hadoop.hdds.scm.ScmInfo;
  import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 +import org.apache.hadoop.hdds.utils.db.Table;
 +import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 +import org.apache.hadoop.hdds.utils.db.TableIterator;
+ import org.apache.hadoop.ozone.OzoneManagerVersion;
  import org.apache.hadoop.ozone.om.helpers.BucketLayout;
  import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
  import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index ef688ee589,ef61417c33..0df19af147
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@@ -182,43 -179,12 +182,43 @@@ public class OMDBDefinition implements 
  
    public static final DBColumnFamilyDefinition<String, String>
        META_TABLE = new DBColumnFamilyDefinition<>(
-       OmMetadataManagerImpl.META_TABLE,
-       String.class,
-       new StringCodec(),
-       String.class,
-       new StringCodec());
+           OmMetadataManagerImpl.META_TABLE,
+           String.class,
+           new StringCodec(),
+           String.class,
+           new StringCodec());
  
 +  // Tables for multi-tenancy
 +
 +  public static final DBColumnFamilyDefinition<String, OmDBAccessIdInfo>
 +            TENANT_ACCESS_ID_TABLE =
 +            new DBColumnFamilyDefinition<>(
 +                    OmMetadataManagerImpl.TENANT_ACCESS_ID_TABLE,
 +                    String.class,  // accessId
 +                    new StringCodec(),
 +                    OmDBAccessIdInfo.class,  // tenantId, secret, principal
 +                    new OmDBAccessIdInfoCodec());
 +
 +  public static final DBColumnFamilyDefinition<String, OmDBUserPrincipalInfo>
 +            PRINCIPAL_TO_ACCESS_IDS_TABLE =
 +            new DBColumnFamilyDefinition<>(
 +                    OmMetadataManagerImpl.PRINCIPAL_TO_ACCESS_IDS_TABLE,
 +                    String.class,  // User principal
 +                    new StringCodec(),
 +                    OmDBUserPrincipalInfo.class,  // List of accessIds
 +                    new OmDBUserPrincipalInfoCodec());
 +
 +  public static final DBColumnFamilyDefinition<String, OmDBTenantState>
 +            TENANT_STATE_TABLE =
 +            new DBColumnFamilyDefinition<>(
 +                    OmMetadataManagerImpl.TENANT_STATE_TABLE,
 +                    String.class,  // tenantId (tenant name)
 +                    new StringCodec(),
 +                    OmDBTenantState.class,
 +                    new OmDBTenantStateCodec());
 +
 +  // End tables for S3 multi-tenancy
 +
    @Override
    public String getName() {
      return OzoneConsts.OM_DB_NAME;
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 114a528ad4,e9a4be8acc..b425ea4424
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@@ -176,42 -174,85 +181,99 @@@ public final class OzoneManagerRatisUti
        return new OMPrepareRequest(omRequest);
      case CancelPrepare:
        return new OMCancelPrepareRequest(omRequest);
 +    case SetS3Secret:
 +      return new OMSetSecretRequest(omRequest);
      case RevokeS3Secret:
        return new S3RevokeSecretRequest(omRequest);
+     case PurgeKeys:
+       return new OMKeyPurgeRequest(omRequest);
+     case PurgePaths:
+       return new OMPathsPurgeRequestWithFSO(omRequest);
 +    case CreateTenant:
 +      return new OMTenantCreateRequest(omRequest);
 +    case DeleteTenant:
 +      return new OMTenantDeleteRequest(omRequest);
 +    case TenantAssignUserAccessId:
 +      return new OMTenantAssignUserAccessIdRequest(omRequest);
 +    case TenantRevokeUserAccessId:
 +      return new OMTenantRevokeUserAccessIdRequest(omRequest);
 +    case TenantAssignAdmin:
 +      return new OMTenantAssignAdminRequest(omRequest);
 +    case TenantRevokeAdmin:
 +      return new OMTenantRevokeAdminRequest(omRequest);
  
-     /**
-      * Following key requests will be created in {@link OMKeyRequestFactory}.
+     /*
+      * Key requests that can have multiple variants based on the bucket layout
+      * should be created using {@link BucketLayoutAwareOMKeyRequestFactory}.
       */
      case CreateDirectory:
+       keyArgs = omRequest.getCreateDirectoryRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case CreateFile:
+       keyArgs = omRequest.getCreateFileRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case CreateKey:
+       keyArgs = omRequest.getCreateKeyRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case AllocateBlock:
+       keyArgs = omRequest.getAllocateBlockRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case CommitKey:
+       keyArgs = omRequest.getCommitKeyRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case DeleteKey:
+       keyArgs = omRequest.getDeleteKeyRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case DeleteKeys:
+       OzoneManagerProtocolProtos.DeleteKeyArgs deleteKeyArgs =
+           omRequest.getDeleteKeysRequest()
+               .getDeleteKeys();
+       volumeName = deleteKeyArgs.getVolumeName();
+       bucketName = deleteKeyArgs.getBucketName();
+       break;
      case RenameKey:
+       keyArgs = omRequest.getRenameKeyRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case RenameKeys:
-     case PurgeKeys:
-     case PurgePaths:
+       OzoneManagerProtocolProtos.RenameKeysArgs renameKeysArgs =
+           omRequest.getRenameKeysRequest().getRenameKeysArgs();
+       volumeName = renameKeysArgs.getVolumeName();
+       bucketName = renameKeysArgs.getBucketName();
+       break;
      case InitiateMultiPartUpload:
+       keyArgs = omRequest.getInitiateMultiPartUploadRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case CommitMultiPartUpload:
+       keyArgs = omRequest.getCommitMultiPartUploadRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case AbortMultiPartUpload:
+       keyArgs = omRequest.getAbortMultiPartUploadRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      case CompleteMultiPartUpload:
-       return OMKeyRequestFactory.createRequest(omRequest, ozoneManager);
+       keyArgs = omRequest.getCompleteMultiPartUploadRequest().getKeyArgs();
+       volumeName = keyArgs.getVolumeName();
+       bucketName = keyArgs.getBucketName();
+       break;
      default:
        throw new IllegalStateException("Unrecognized write command " +
            "type request" + cmdType);
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
index a1b5702a89,0000000000..63318f0161
mode 100644,000000..100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
@@@ -1,204 -1,0 +1,207 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.ozone.om.request.s3.security;
 +
 +import com.google.common.base.Optional;
 +import org.apache.commons.lang3.StringUtils;
 +import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 +import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 +import org.apache.hadoop.ipc.ProtobufRpcEngine;
 +import org.apache.hadoop.ozone.OzoneConsts;
 +import org.apache.hadoop.ozone.audit.OMAction;
 +import org.apache.hadoop.ozone.om.OMMetadataManager;
 +import org.apache.hadoop.ozone.om.OzoneManager;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 +import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 +import org.apache.hadoop.ozone.om.request.OMClientRequest;
 +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 +import org.apache.hadoop.ozone.om.response.OMClientResponse;
 +import org.apache.hadoop.ozone.om.response.s3.security.OMSetSecretResponse;
- import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetS3SecretRequest;
++import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetS3SecretResponse;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 +
 +/**
 + * Handles SetSecret request.
 + */
 +public class OMSetSecretRequest extends OMClientRequest {
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(OMSetSecretRequest.class);
 +
 +  public OMSetSecretRequest(OMRequest omRequest) {
 +    super(omRequest);
 +  }
 +
 +  @Override
 +  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
 +    final OMMetadataManager omMetadataManager =
 +        ozoneManager.getMetadataManager();
 +
 +    final SetS3SecretRequest request =
 +        getOmRequest().getSetS3SecretRequest();
 +
 +    final String accessId = request.getAccessId();
 +
 +    // First check accessId existence
 +    final OmDBAccessIdInfo accessIdInfo = omMetadataManager
 +        .getTenantAccessIdTable().get(accessId);
 +
 +    if (accessIdInfo == null) {
 +      // Check (old) S3SecretTable
 +      if (omMetadataManager.getS3SecretTable().get(accessId) == null) {
 +        throw new OMException("accessId '" + accessId + "' not found.",
 +            OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
 +      }
 +    }
 +
 +    // Secret should not be empty
 +    final String secretKey = request.getSecretKey();
 +    if (StringUtils.isEmpty(secretKey)) {
 +      throw new OMException("Secret key should not be empty",
 +              OMException.ResultCodes.INVALID_REQUEST);
 +    }
 +
 +    if (secretKey.length() < OzoneConsts.S3_SECRET_KEY_MIN_LENGTH) {
 +      throw new OMException("Secret key length should be at least " +
 +          OzoneConsts.S3_SECRET_KEY_MIN_LENGTH + " characters",
 +          OMException.ResultCodes.INVALID_REQUEST);
 +    }
 +
 +    // TODO: Check if secretKey matches other requirements? e.g. combination
 +
 +    final UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
 +    final String username = ugi.getUserName();
 +
 +    // Permission check. To pass the check, any one of the following 
conditions
 +    // shall be satisfied:
 +    // 1. username matches accessId exactly
 +    // 2. user is an OM admin
 +    // 3. user is assigned to a tenant under this accessId
 +    // 4. user is an admin of the tenant where the accessId is assigned
 +
 +    if (!username.equals(accessId) && !ozoneManager.isAdmin(ugi)) {
 +      // Attempt to retrieve tenant info using the accessId
 +      if (!ozoneManager.getMultiTenantManager()
 +          .isUserAccessIdPrincipalOrTenantAdmin(accessId, ugi)) {
 +        throw new OMException("Permission denied. Requested accessId '" +
 +                accessId + "' and user doesn't satisfy any of:\n" +
 +                "1) accessId match current username: '" + username + "';\n" +
 +                "2) is an OM admin;\n" +
 +                "3) user is assigned to a tenant under this accessId;\n" +
 +                "4) user is an admin of the tenant where the accessId is " +
 +                "assigned", OMException.ResultCodes.PERMISSION_DENIED);
 +      }
 +    }
 +
 +    return getOmRequest();
 +  }
 +
 +  @Override
 +  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
 +      long transactionLogIndex,
 +      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 +
 +    OMClientResponse omClientResponse = null;
 +    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
 +        getOmRequest());
 +    boolean acquiredLock = false;
 +    IOException exception = null;
 +    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 +
 +    final SetS3SecretRequest request = getOmRequest().getSetS3SecretRequest();
 +    final String accessId = request.getAccessId();
 +    final String secretKey = request.getSecretKey();
 +
 +    try {
 +      acquiredLock = omMetadataManager.getLock().acquireWriteLock(
 +          S3_SECRET_LOCK, accessId);
 +
 +      // Intentionally set to final so they can only be set once.
 +      final S3SecretValue newS3SecretValue;
 +
 +      // Update legacy S3SecretTable, if the accessId entry exists
 +      if (omMetadataManager.getS3SecretTable().get(accessId) != null) {
 +        // accessId found in S3SecretTable. Update S3SecretTable
 +        LOG.debug("Updating S3SecretTable cache entry");
 +        // Update S3SecretTable cache entry in this case
 +        newS3SecretValue = new S3SecretValue(accessId, secretKey);
 +
 +        omMetadataManager.getS3SecretTable().addCacheEntry(
 +            new CacheKey<>(accessId),
 +            new CacheValue<>(Optional.of(newS3SecretValue),
 +                transactionLogIndex));
 +      } else {
 +        // If S3SecretTable is not updated, throw ACCESS_ID_NOT_FOUND 
exception.
 +        throw new OMException("accessId '" + accessId + "' not found.",
 +            OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
 +      }
 +
 +      // Compose response
 +      final SetS3SecretResponse.Builder setSecretResponse =
 +          SetS3SecretResponse.newBuilder()
 +              .setAccessId(accessId)
 +              .setSecretKey(secretKey);
 +
 +      omClientResponse = new OMSetSecretResponse(accessId, newS3SecretValue,
 +          omResponse.setSetS3SecretResponse(setSecretResponse).build());
 +
 +    } catch (IOException ex) {
 +      exception = ex;
 +      omClientResponse = new OMSetSecretResponse(
 +          createErrorOMResponse(omResponse, ex));
 +    } finally {
 +      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
 +          ozoneManagerDoubleBufferHelper);
 +      if (acquiredLock) {
 +        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK,
 +            accessId);
 +      }
 +    }
 +
 +    final Map<String, String> auditMap = new HashMap<>();
 +    auditMap.put(OzoneConsts.S3_SETSECRET_USER, accessId);
 +
 +    // audit log
 +    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
 +        OMAction.SET_S3_SECRET, auditMap,
 +        exception, getOmRequest().getUserInfo()));
 +
 +    if (exception == null) {
 +      LOG.debug("Success: SetSecret for accessKey '{}'", accessId);
 +    } else {
 +      LOG.error("Failed to SetSecret for accessKey '{}': {}",
 +          accessId, exception);
 +    }
 +    return omClientResponse;
 +  }
 +
 +}
diff --cc 
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 049ad61738,b6a928f289..32285d282c
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@@ -22,8 -24,16 +24,15 @@@ import javax.ws.rs.core.Context
  import java.io.IOException;
  import java.util.Collections;
  import java.util.Iterator;
+ import java.util.Map;
  import java.util.function.Function;
  
 -import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+ import org.apache.hadoop.ozone.audit.AuditAction;
+ import org.apache.hadoop.ozone.audit.AuditEventStatus;
+ import org.apache.hadoop.ozone.audit.AuditLogger;
+ import org.apache.hadoop.ozone.audit.AuditLoggerType;
+ import org.apache.hadoop.ozone.audit.AuditMessage;
+ import org.apache.hadoop.ozone.audit.Auditor;
  import org.apache.hadoop.ozone.client.OzoneBucket;
  import org.apache.hadoop.ozone.client.OzoneClient;
  import org.apache.hadoop.ozone.client.OzoneVolume;
diff --cc 
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index fc727acb89,a852a3c2ab..838e86ea85
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@@ -189,9 -203,10 +205,10 @@@ public class ObjectEndpoint extends End
  
        if (copyHeader != null) {
          //Copy object, as copy source available.
+         s3GAction = S3GAction.COPY_OBJECT;
          CopyObjectResponse copyObjectResponse = copyObject(
 -            copyHeader, bucketName, keyPath, replicationType,
 -            replicationFactor, storageTypeDefault);
 +            copyHeader, bucketName, keyPath, replicationConfig,
 +            storageTypeDefault);
          return Response.status(Status.OK).entity(copyObjectResponse).header(
              "Connection", "close").build();
        }
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
index 0000000000,a598bdea60..5be8092e48
mode 000000,100644..100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
@@@ -1,0 -1,156 +1,158 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+ package org.apache.hadoop.ozone.s3;
+ 
+ import org.apache.commons.io.FileUtils;
+ import org.apache.commons.lang3.RandomStringUtils;
++import org.apache.hadoop.hdds.client.ReplicationConfig;
+ import org.apache.hadoop.hdds.client.ReplicationFactor;
+ import org.apache.hadoop.hdds.client.ReplicationType;
+ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+ import org.apache.hadoop.ozone.OzoneConsts;
+ import org.apache.hadoop.ozone.client.OzoneBucket;
+ import org.apache.hadoop.ozone.client.OzoneClient;
+ import org.apache.hadoop.ozone.client.OzoneClientStub;
+ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+ import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
+ import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
+ import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+ import org.junit.AfterClass;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.HashMap;
+ import java.util.List;
+ 
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ import static org.junit.Assert.assertEquals;
+ 
+ /**
+  * Tests for S3Gateway Audit Log.
+  */
+ public class TestS3GatewayAuditLog {
+ 
+   private static final Logger LOG =
+       LoggerFactory.getLogger(TestS3GatewayAuditLog.class.getName());
+ 
+   static {
+     System.setProperty("log4j.configurationFile", "auditlog.properties");
+     System.setProperty("log4j2.contextSelector",
+         "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+   }
+ 
+   private String bucketName = OzoneConsts.BUCKET;
+   private OzoneClient clientStub;
+   private BucketEndpoint bucketEndpoint;
+   private RootEndpoint rootEndpoint;
+   private ObjectEndpoint keyEndpoint;
+   private OzoneBucket bucket;
+ 
+   @Before
+   public void setup() throws Exception {
+ 
+     clientStub = new OzoneClientStub();
+     clientStub.getObjectStore().createS3Bucket(bucketName);
+     bucket = clientStub.getObjectStore().getS3Bucket(bucketName);
+ 
+     bucketEndpoint = new BucketEndpoint();
+     bucketEndpoint.setClient(clientStub);
+ 
+     rootEndpoint = new RootEndpoint();
+     rootEndpoint.setClient(clientStub);
+ 
+     keyEndpoint = new ObjectEndpoint();
+     keyEndpoint.setClient(clientStub);
+     keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+ 
+   }
+ 
+   @AfterClass
+   public static void tearDown() {
+     File file = new File("audit.log");
+     if (FileUtils.deleteQuietly(file)) {
+       LOG.info("{} has been deleted as all tests have completed.",
+           file.getName());
+     } else {
+       LOG.info("audit.log could not be deleted.");
+     }
+   }
+ 
+   @Test
+   public void testHeadBucket() throws Exception {
+     bucketEndpoint.head(bucketName);
+     String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+         "op=HEAD_BUCKET {bucket=bucket} | ret=SUCCESS";
+     verifyLog(expected);
+   }
+ 
+   @Test
+   public void testListBucket() throws Exception {
+ 
+     rootEndpoint.get().getEntity();
+     String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+         "op=LIST_S3_BUCKETS {} | ret=SUCCESS";
+     verifyLog(expected);
+   }
+ 
+   @Test
+   public void testHeadObject() throws Exception {
+     String value = RandomStringUtils.randomAlphanumeric(32);
+     OzoneOutputStream out = bucket.createKey("key1",
 -        value.getBytes(UTF_8).length, ReplicationType.RATIS,
 -        ReplicationFactor.ONE, new HashMap<>());
++        value.getBytes(UTF_8).length,
++        ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
++        ReplicationFactor.ONE), new HashMap<>());
+     out.write(value.getBytes(UTF_8));
+     out.close();
+ 
+ 
+     keyEndpoint.head(bucketName, "key1");
+     String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+         "op=HEAD_KEY {bucket=bucket, keyPath=key1} | ret=SUCCESS";
+     verifyLog(expected);
+ 
+   }
+ 
+   private void verifyLog(String expectedString) throws IOException {
+     File file = new File("audit.log");
+     List<String> lines = FileUtils.readLines(file, (String)null);
+     final int retry = 5;
+     int i = 0;
+     while (lines.isEmpty() && i < retry) {
+       lines = FileUtils.readLines(file, (String)null);
+       try {
+         Thread.sleep(500 * (i + 1));
+       } catch (InterruptedException ie) {
+         Thread.currentThread().interrupt();
+         break;
+       }
+       i++;
+     }
+     assertEquals(lines.get(0), expectedString);
+ 
+     //empty the file
+     lines.clear();
+     FileUtils.writeLines(file, lines, false);
+   }
+ 
+ }
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
index bd3b4d0814,ca20dd89f7..1ae9ed91de
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
@@@ -211,10 -211,10 +211,10 @@@ public class TestPermissionCheck 
      bucketEndpoint.setClient(client);
      try {
        bucketEndpoint.get("bucketName", null, null, null, 1000,
-           null, null, null, null, null, "acl", null);
+           null, null, null, null, "acl", null);
 -    } catch (Exception e) {
 -      Assert.assertTrue(e instanceof OS3Exception &&
 -          ((OS3Exception)e).getHttpCode() == HTTP_FORBIDDEN);
 +      Assert.fail("Expected OS3Exception with FORBIDDEN http code.");
 +    } catch (OS3Exception e) {
 +      Assert.assertEquals(HTTP_FORBIDDEN, e.getHttpCode());
      }
    }
  
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
index 0000000000,ccf36a70c0..c8f421aae9
mode 000000,100644..100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
@@@ -1,0 -1,282 +1,284 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+ package org.apache.hadoop.ozone.s3.metrics;
+ 
+ import org.apache.commons.lang3.RandomStringUtils;
++import org.apache.hadoop.hdds.client.ReplicationConfig;
+ import org.apache.hadoop.hdds.client.ReplicationFactor;
+ import org.apache.hadoop.hdds.client.ReplicationType;
+ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+ import org.apache.hadoop.ozone.OzoneConsts;
+ import org.apache.hadoop.ozone.client.OzoneBucket;
+ import org.apache.hadoop.ozone.client.OzoneClient;
+ import org.apache.hadoop.ozone.client.OzoneClientStub;
+ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+ import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
+ import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
+ import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+ import org.apache.hadoop.ozone.s3.endpoint.TestBucketAcl;
+ import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+ import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.mockito.Mockito;
+ 
+ import javax.ws.rs.core.HttpHeaders;
+ import javax.ws.rs.core.Response;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.HashMap;
+ 
+ import static java.net.HttpURLConnection.HTTP_OK;
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ /**
+  * Tests for {@link S3GatewayMetrics}.
+  */
+ public class TestS3GatewayMetrics {
+ 
+   private String bucketName = OzoneConsts.BUCKET;
+   private OzoneClient clientStub;
+   private BucketEndpoint bucketEndpoint;
+   private RootEndpoint rootEndpoint;
+   private ObjectEndpoint keyEndpoint;
+   private OzoneBucket bucket;
+   private HttpHeaders headers;
+   private static final String ACL_MARKER = "acl";
+   private S3GatewayMetrics metrics;
+ 
+ 
+   @Before
+   public void setup() throws Exception {
+     clientStub = new OzoneClientStub();
+     clientStub.getObjectStore().createS3Bucket(bucketName);
+     bucket = clientStub.getObjectStore().getS3Bucket(bucketName);
+ 
+     bucketEndpoint = new BucketEndpoint();
+     bucketEndpoint.setClient(clientStub);
+ 
+     rootEndpoint = new RootEndpoint();
+     rootEndpoint.setClient(clientStub);
+ 
+     keyEndpoint = new ObjectEndpoint();
+     keyEndpoint.setClient(clientStub);
+     keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+ 
+     headers = Mockito.mock(HttpHeaders.class);
+     metrics = bucketEndpoint.getMetrics();
+   }
+ 
+   @Test
+   public void testHeadBucket() throws Exception {
+ 
+     long oriMetric = metrics.getHeadBucketSuccess();
+ 
+     bucketEndpoint.head(bucketName);
+ 
+     long curMetric = metrics.getHeadBucketSuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testListBucket() throws Exception {
+ 
+     long oriMetric = metrics.getListS3BucketsSuccess();
+ 
+     rootEndpoint.get().getEntity();
+ 
+     long curMetric = metrics.getListS3BucketsSuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testHeadObject() throws Exception {
+     String value = RandomStringUtils.randomAlphanumeric(32);
+     OzoneOutputStream out = bucket.createKey("key1",
 -        value.getBytes(UTF_8).length, ReplicationType.RATIS,
 -        ReplicationFactor.ONE, new HashMap<>());
++        value.getBytes(UTF_8).length,
++        ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
++        ReplicationFactor.ONE), new HashMap<>());
+     out.write(value.getBytes(UTF_8));
+     out.close();
+ 
+     long oriMetric = metrics.getHeadKeySuccess();
+ 
+     keyEndpoint.head(bucketName, "key1");
+ 
+     long curMetric = metrics.getHeadKeySuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testGetBucketSuccess() throws Exception {
+     long oriMetric = metrics.getGetBucketSuccess();
+ 
+     clientStub = createClientWithKeys("file1");
+     bucketEndpoint.setClient(clientStub);
+     bucketEndpoint.get(bucketName, null,
+         null, null, 1000, null,
+         null, "random", null,
+         null, null).getEntity();
+ 
+     long curMetric = metrics.getGetBucketSuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testGetBucketFailure() throws Exception {
+     long oriMetric = metrics.getGetBucketFailure();
+ 
+     try {
+       // Searching for a bucket that does not exist
+       bucketEndpoint.get("newBucket", null,
+           null, null, 1000, null,
+           null, "random", null,
+           null, null);
+       fail();
+     } catch (OS3Exception e) {
+     }
+ 
+     long curMetric = metrics.getGetBucketFailure();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testCreateBucketSuccess() throws Exception {
+ 
+     long oriMetric = metrics.getCreateBucketSuccess();
+ 
+     bucketEndpoint.put(bucketName, null,
+         null, null);
+     long curMetric = metrics.getCreateBucketSuccess();
+ 
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testCreateBucketFailure() throws Exception {
+     // Creating an error by trying to create a bucket that already exists
+     long oriMetric = metrics.getCreateBucketFailure();
+ 
+     bucketEndpoint.put(bucketName, null, null, null);
+ 
+     long curMetric = metrics.getCreateBucketFailure();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testDeleteBucketSuccess() throws Exception {
+     long oriMetric = metrics.getDeleteBucketSuccess();
+ 
+     bucketEndpoint.delete(bucketName);
+ 
+     long curMetric = metrics.getDeleteBucketSuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testDeleteBucketFailure() throws Exception {
+     long oriMetric = metrics.getDeleteBucketFailure();
+     bucketEndpoint.delete(bucketName);
+     try {
+       // Deleting a bucket that does not exist will result in delete failure
+       bucketEndpoint.delete(bucketName);
+       fail();
+     } catch (OS3Exception ex) {
+       assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+       assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+           ex.getErrorMessage());
+     }
+ 
+     long curMetric = metrics.getDeleteBucketFailure();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testGetAclSuccess() throws Exception {
+     long oriMetric = metrics.getGetAclSuccess();
+ 
+     Response response =
+         bucketEndpoint.get(bucketName, null, null,
+             null, 0, null, null,
+             null, null, "acl", null);
+     long curMetric = metrics.getGetAclSuccess();
+     assertEquals(HTTP_OK, response.getStatus());
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testGetAclFailure() throws Exception {
+     long oriMetric = metrics.getGetAclFailure();
+     try {
+       // Failing the getACL endpoint by applying ACL on a non-Existent Bucket
+       bucketEndpoint.get("random_bucket", null,
+           null, null, 0, null,
+           null, null, null, "acl", null);
+       fail();
+     } catch (OS3Exception ex) {
+       assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+       assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+           ex.getErrorMessage());
+     }
+     long curMetric = metrics.getGetAclFailure();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testPutAclSuccess() throws Exception {
+     long oriMetric = metrics.getPutAclSuccess();
+ 
+     clientStub.getObjectStore().createS3Bucket("b1");
+     InputStream inputBody = TestBucketAcl.class.getClassLoader()
+         .getResourceAsStream("userAccessControlList.xml");
+ 
+     bucketEndpoint.put("b1", ACL_MARKER, headers, inputBody);
+     inputBody.close();
+     long curMetric = metrics.getPutAclSuccess();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   @Test
+   public void testPutAclFailure() throws Exception {
+     // Failing the putACL endpoint by applying ACL on a non-Existent Bucket
+     long oriMetric = metrics.getPutAclFailure();
+ 
+     InputStream inputBody = TestBucketAcl.class.getClassLoader()
+         .getResourceAsStream("userAccessControlList.xml");
+ 
+     try {
+       bucketEndpoint.put("unknown_bucket", ACL_MARKER, headers, inputBody);
+       fail();
+     } catch (OS3Exception ex) {
+     } finally {
+       inputBody.close();
+     }
+     long curMetric = metrics.getPutAclFailure();
+     assertEquals(1L, curMetric - oriMetric);
+   }
+ 
+   private OzoneClient createClientWithKeys(String... keys) throws IOException 
{
+     OzoneBucket bkt = clientStub.getObjectStore().getS3Bucket(bucketName);
+     for (String key : keys) {
+       bkt.createKey(key, 0).close();
+     }
+     return clientStub;
+   }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to