This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-4944
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit f5581ce81932bb3c0f79b976ba5c76dd10d634db
Merge: 8d1d957 173f46a
Author: Siyao Meng <[email protected]>
AuthorDate: Fri Dec 10 10:03:16 2021 -0800

    Merge remote-tracking branch 'asf/master' into HDDS-4944
    
    Conflicts:
    
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
    hadoop-ozone/common/pom.xml
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
    
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
    
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
    hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
    
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
    
    Change-Id: I1d5b5b49ca07c44cfbf3b4886f3bdfe5082e8ce1

 .github/ci.md                                      |  102 ++
 CONTRIBUTING.md                                    |    4 +-
 hadoop-hdds/client/pom.xml                         |    4 +-
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |   38 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   52 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |    8 +
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |    2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   14 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |   32 +-
 hadoop-hdds/common/pom.xml                         |    4 +-
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  122 ++
 .../java/org/apache/hadoop/hdds/NodeDetails.java   |    0
 .../hadoop/hdds/client/RatisReplicationConfig.java |   27 +-
 .../hdds/client/ReplicatedReplicationConfig.java}  |   23 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |  138 +-
 .../hdds/client/ReplicationConfigValidator.java    |    5 +-
 .../hdds/client/StandaloneReplicationConfig.java   |   23 +-
 .../hadoop/hdds/fs/CachingSpaceUsageSource.java    |   23 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |    1 +
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |    8 +-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |    6 +-
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |    7 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |    7 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   31 +-
 .../hadoop/hdds/scm/net/NetworkTopology.java       |    3 +-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |    5 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |    2 +-
 .../protocol/StorageContainerLocationProtocol.java |   20 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |    5 -
 .../hadoop/hdds/utils/ResourceLimitCache.java      |    7 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   12 +
 .../common/src/main/resources/ozone-default.xml    |   94 +-
 .../hadoop/hdds/client/TestReplicationConfig.java  |  280 ++--
 .../hadoop/hdds/utils/TestResourceLimitCache.java  |   46 +-
 hadoop-hdds/config/pom.xml                         |    4 +-
 hadoop-hdds/container-service/pom.xml              |    4 +-
 .../container/common/helpers/ContainerUtils.java   |   68 -
 .../container/common/impl/HddsDispatcher.java      |    4 +-
 .../common/statemachine/DatanodeConfiguration.java |   43 +
 .../common/statemachine/DatanodeStateMachine.java  |   13 +-
 .../common/statemachine/StateContext.java          |   14 +-
 .../CreatePipelineCommandHandler.java              |   10 +-
 .../RefreshVolumeUsageCommandHandler.java          |   73 ++
 .../common/transport/server/XceiverServerGrpc.java |   52 +-
 .../common/transport/server/ratis/CSMMetrics.java  |   10 +
 .../server/ratis/ContainerStateMachine.java        |   82 +-
 .../transport/server/ratis/XceiverServerRatis.java |   12 +-
 .../container/common/volume/MutableVolumeSet.java  |    4 +
 .../container/common/volume/StorageVolume.java     |    4 +
 .../common/volume/StorageVolumeChecker.java        |    1 +
 .../ozone/container/common/volume/VolumeInfo.java  |   13 +-
 .../ozone/container/common/volume/VolumeUsage.java |    3 +
 .../container/keyvalue/KeyValueContainer.java      |    4 +
 .../container/keyvalue/KeyValueContainerCheck.java |    2 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |   29 +-
 .../metadata/DatanodeSchemaOneDBDefinition.java    |    6 +-
 .../replication/DownloadAndImportReplicator.java   |    5 +-
 .../container/replication/MeasuredReplicator.java  |   34 +-
 .../replication/ReplicationSupervisor.java         |    9 +-
 .../replication/ReplicationSupervisorMetrics.java  |   71 +
 .../replication/SimpleContainerDownloader.java     |    7 +-
 .../common/helpers/TestContainerUtils.java         |    2 +-
 .../common/volume/TestStorageVolumeChecker.java    |    4 +
 .../replication/TestMeasuredReplicator.java        |   34 +-
 .../replication/TestReplicationSupervisor.java     |   59 +-
 hadoop-hdds/docs/content/concept/Containers.md     |    2 +-
 hadoop-hdds/docs/content/concept/Datanodes.md      |    4 +-
 hadoop-hdds/docs/content/concept/OzoneManager.md   |    6 +-
 hadoop-hdds/docs/content/concept/Recon.md          |    6 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |    5 +-
 hadoop-hdds/docs/content/feature/PrefixFSO.md      |   24 +-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |    2 +-
 hadoop-hdds/docs/content/feature/Topology.md       |    2 +-
 hadoop-hdds/docs/content/interface/CSI.md          |    4 +-
 hadoop-hdds/docs/content/interface/CSI.zh.md       |    4 +-
 hadoop-hdds/docs/content/interface/Cli.md          |    7 +-
 .../docs/content/security/SecuringOzoneHTTP.md     |   20 +-
 hadoop-hdds/docs/content/start/FromSource.md       |    2 +-
 hadoop-hdds/docs/pom.xml                           |    4 +-
 .../docs/static/design/s3-performance-new.png      |  Bin 33688 -> 34927 bytes
 .../docs/static/design/s3-performance-old.png      |  Bin 34927 -> 33688 bytes
 .../themes/ozonedoc/layouts/partials/navbar.html   |    6 +-
 .../themes/ozonedoc/layouts/partials/sidebar.html  |    4 +-
 .../themes/ozonedoc/layouts/shortcodes/image.html  |   19 +
 hadoop-hdds/framework/pom.xml                      |    4 +-
 .../java/org/apache/hadoop/hdds/ExitManager.java   |   12 +-
 .../hadoop/hdds/conf/DatanodeRatisGrpcConfig.java  |    4 +-
 .../hdds/conf/DatanodeRatisServerConfig.java       |   16 +
 .../scm/protocol/ScmBlockLocationProtocol.java     |    2 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   12 +-
 .../certificate/client/DNCertificateClient.java    |    5 -
 .../client/DefaultCertificateClient.java           |    2 +-
 .../certificate/client/OMCertificateClient.java    |    5 -
 .../certificate/client/SCMCertificateClient.java   |   10 +-
 .../hadoop/hdds/security/x509/keys/KeyCodec.java   |   31 +-
 .../org/apache/hadoop/hdds/server/JsonUtils.java   |   16 +-
 .../hadoop/hdds/utils/db/RDBStoreIterator.java     |   19 +-
 .../apache/hadoop/hdds/utils/db/TableIterator.java |   12 -
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   18 -
 .../hdds/security/x509/keys/TestKeyCodec.java      |    5 +-
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java |   19 +-
 hadoop-hdds/hadoop-dependency-client/pom.xml       |    4 +-
 hadoop-hdds/hadoop-dependency-server/pom.xml       |    4 +-
 hadoop-hdds/hadoop-dependency-test/pom.xml         |    4 +-
 hadoop-hdds/interface-admin/pom.xml                |    4 +-
 .../src/main/proto/ScmAdminProtocol.proto          |    1 +
 .../interface-admin/src/main/resources/proto.lock  |  504 +++++++
 hadoop-hdds/interface-client/pom.xml               |    4 +-
 .../interface-client/src/main/proto/hdds.proto     |   11 +
 .../interface-client/src/main/resources/proto.lock |  408 +++++-
 hadoop-hdds/interface-server/pom.xml               |    4 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |    1 +
 .../interface-server/src/main/resources/proto.lock | 1024 +++++++++++++++
 hadoop-hdds/pom.xml                                |    4 +-
 hadoop-hdds/server-scm/pom.xml                     |    4 +-
 .../scm/block/DeletedBlockLogStateManagerImpl.java |   10 -
 .../scm/container/CloseContainerEventHandler.java  |   12 +-
 .../hdds/scm/container/ContainerManager.java       |    2 +-
 .../hdds/scm/container/ContainerManagerImpl.java   |    4 +-
 .../hdds/scm/container/ContainerStateManager.java  |  570 ++------
 .../scm/container/ContainerStateManagerImpl.java   |   19 +-
 .../scm/container/ContainerStateManagerV2.java     |  189 ---
 .../hdds/scm/container/ReplicationManager.java     |   43 +-
 .../scm/container/balancer/ContainerBalancer.java  |  338 ++---
 .../balancer/ContainerBalancerConfiguration.java   |  125 +-
 .../balancer/ContainerBalancerMetrics.java         |  115 +-
 .../ContainerBalancerSelectionCriteria.java        |   22 +-
 .../scm/container/balancer/FindSourceGreedy.java   |  158 +++
 .../scm/container/balancer/FindSourceStrategy.java |   67 +
 .../scm/container/balancer/FindTargetGreedy.java   |  121 +-
 .../scm/container/balancer/FindTargetStrategy.java |   32 +-
 .../algorithms/SCMContainerPlacementCapacity.java  |    2 +-
 .../algorithms/SCMContainerPlacementRackAware.java |   72 +-
 .../replication/ReplicationManagerMetrics.java     |   41 +
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   11 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMContext.java  |   23 +-
 .../apache/hadoop/hdds/scm/ha/SCMRatisRequest.java |   24 +
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |    7 +
 .../hadoop/hdds/scm/ha/SequenceIdGenerator.java    |   26 +-
 .../hadoop/hdds/scm/node/DatanodeUsageInfo.java    |   30 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |    8 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   13 +
 .../scm/pipeline/BackgroundPipelineCreator.java    |  266 +++-
 .../scm/pipeline/BackgroundPipelineCreatorV2.java  |  332 -----
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |    2 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   10 +
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   54 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   30 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |    6 +-
 .../hdds/scm/pipeline/PipelineStateManager.java    |  271 ++--
 ...erV2Impl.java => PipelineStateManagerImpl.java} |   70 +-
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |   27 +
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |    2 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |    4 +-
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |    2 +-
 .../hadoop/hdds/scm/pipeline/StateManager.java     |  127 --
 .../algorithms/DefaultLeaderChoosePolicy.java      |    4 +-
 .../choose/algorithms/LeaderChoosePolicy.java      |    8 +-
 .../algorithms/LeaderChoosePolicyFactory.java      |    8 +-
 .../algorithms/MinLeaderCountChoosePolicy.java     |    6 +-
 ...lockLocationProtocolServerSideTranslatorPB.java |    2 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |    6 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |   51 +-
 .../hdds/scm/server/StorageContainerManager.java   |   26 +-
 .../scm/server/upgrade/SCMUpgradeFinalizer.java    |    5 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |   18 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |   18 +-
 .../hdds/scm/container/SimpleMockNodeManager.java  |    5 +
 .../scm/container/TestContainerManagerImpl.java    |    6 +-
 .../scm/container/TestContainerReportHandler.java  |  267 ++--
 .../scm/container/TestContainerStateManager.java   |   91 +-
 .../TestIncrementalContainerReportHandler.java     |  126 +-
 .../hdds/scm/container/TestReplicationManager.java |  380 +++---
 .../scm/container/TestUnknownContainerReport.java  |   47 +-
 .../container/balancer/TestContainerBalancer.java  |  146 ++-
 .../hdds/scm/container/balancer}/package-info.java |   13 +-
 .../TestSCMContainerPlacementCapacity.java         |    6 +-
 .../TestSCMContainerPlacementRackAware.java        |   15 +
 .../hdds/scm/ha/TestReplicationAnnotation.java     |    8 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   31 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |    6 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   11 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   33 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |    8 +-
 .../TestPipelineDatanodesIntersection.java         |   52 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |   85 +-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |   73 +-
 ...ager.java => TestPipelineStateManagerImpl.java} |  245 ++--
 .../scm/pipeline/TestRatisPipelineProvider.java    |   66 +-
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java |  209 ---
 .../scm/pipeline/TestSimplePipelineProvider.java   |   48 +-
 .../choose/algorithms/TestLeaderChoosePolicy.java  |    6 +-
 .../placement/TestContainerPlacement.java          |    4 +-
 .../testutils/ReplicationNodeManagerMock.java      |   11 +
 hadoop-hdds/test-utils/pom.xml                     |    4 +-
 hadoop-hdds/tools/pom.xml                          |    4 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |    5 +-
 .../hdds/scm/cli/container/ListSubcommand.java     |    6 +-
 .../hdds/scm/cli/datanode/UsageInfoSubcommand.java |   41 +-
 hadoop-ozone/client/pom.xml                        |    4 +-
 .../org/apache/hadoop/ozone/client/BucketArgs.java |   29 +-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |   40 +-
 .../ozone/client/io/BlockOutputStreamEntry.java    |  268 ++--
 .../client/io/BlockOutputStreamEntryPool.java      |  109 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   16 +-
 .../hadoop/ozone/client/io/OzoneInputStream.java   |   33 +-
 .../ozone/client/protocol/ClientProtocol.java      |   20 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  113 +-
 .../hadoop/ozone/client/rpc/RpcClientTest.java     |  217 ++++
 hadoop-ozone/common/pom.xml                        |   12 +-
 .../main/java/org/apache/hadoop/ozone/OFSPath.java |    3 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  109 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   34 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |   11 +-
 .../hadoop/ozone/om/helpers/BucketLayout.java      |   25 +
 .../hadoop/ozone/om/helpers/OMNodeDetails.java     |   50 +-
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |   32 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |   27 +-
 .../ozone/om/helpers/OmMultipartKeyInfo.java       |    2 +-
 .../hadoop/ozone/om/helpers/OzoneFSUtils.java      |   24 +-
 .../hadoop/ozone/om/helpers/ServiceInfo.java       |   41 +-
 .../MultiTenantAccessAuthorizerRangerPlugin.java   |    2 +-
 .../hadoop/ozone/om/protocol/OMAdminProtocol.java  |   23 +-
 .../hadoop/ozone/om/protocol/OMConfiguration.java  |   92 ++
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  224 +++-
 .../apache/hadoop/ozone/om/protocol/S3Auth.java}   |   29 +-
 .../protocolPB/OMAdminProtocolClientSideImpl.java  |  136 ++
 .../ozone/om/protocolPB/OMAdminProtocolPB.java}    |   22 +-
 .../om/protocolPB/OzoneManagerClientProtocol.java  |   27 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   63 +-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |   41 +-
 hadoop-ozone/csi/pom.xml                           |    4 +-
 .../org/apache/hadoop/ozone/csi/NodeService.java   |   10 +-
 hadoop-ozone/datanode/pom.xml                      |    4 +-
 hadoop-ozone/dist/README.md                        |    6 +-
 hadoop-ozone/dist/pom.xml                          |    4 +-
 hadoop-ozone/dist/src/main/compose/ozone/README.md |    6 -
 .../src/main/compose/ozone/docker-compose.yaml     |   10 -
 hadoop-ozone/dist/src/main/compose/ozone/run.sh    |    4 -
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   29 +-
 .../src/main/compose/ozonesecure-mr/docker-config  |    3 +
 .../dist/src/main/compose/xcompat/docker-config    |    1 +
 hadoop-ozone/dist/src/main/k8s/examples/testlib.sh |    5 +-
 .../src/main/smoketest/admincli/container.robot    |    8 +
 .../dist/src/main/smoketest/basic/links.robot      |    8 +-
 .../src/main/smoketest/basic/ozone-shell-lib.robot |   10 +-
 .../main/smoketest/basic/ozone-shell-single.robot  |    2 +-
 .../src/main/smoketest/basic/ozone-shell.robot     |   15 +-
 .../dist/src/main/smoketest/createmrenv.robot      |    2 +-
 .../dist/src/main/smoketest/freon/generate.robot   |    9 +
 .../dist/src/main/smoketest/freon/validate.robot   |    9 +
 .../dist/src/main/smoketest/omha/testOMHA.robot    |    2 +-
 .../dist/src/main/smoketest/ozonefs/ozonefs.robot  |   18 +-
 .../dist/src/main/smoketest/ozonefs/setup.robot    |   12 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |   23 +-
 .../dist/src/main/smoketest/s3/objectdelete.robot  |    6 +-
 .../main/smoketest/security/ozone-secure-fs.robot  |   12 +-
 .../fault-injection-test/mini-chaos-tests/pom.xml  |    4 +-
 .../fault-injection-test/network-tests/pom.xml     |    2 +-
 hadoop-ozone/fault-injection-test/pom.xml          |    4 +-
 hadoop-ozone/insight/pom.xml                       |    4 +-
 .../dev-support/findbugsExcludeFile.xml            |    4 +
 hadoop-ozone/integration-test/pom.xml              |    4 +-
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |   19 +-
 .../hadoop/fs/ozone/TestOzoneFSInputStream.java    |    4 +
 .../fs/ozone/TestOzoneFSWithObjectStoreCreate.java |    3 +
 .../hadoop/fs/ozone/TestOzoneFileInterfaces.java   |   85 +-
 .../fs/ozone/TestOzoneFileInterfacesWithFSO.java   |   25 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   47 +-
 .../fs/ozone/TestOzoneFileSystemMetrics.java       |    3 +
 .../fs/ozone/TestOzoneFileSystemMissingParent.java |    2 +
 .../fs/ozone/TestOzoneFileSystemPrefixParser.java  |    9 +-
 .../fs/ozone/TestOzoneFileSystemWithFSO.java       |    9 +-
 .../fs/ozone/TestOzoneFileSystemWithLinks.java     |  248 ++++
 .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java  |    5 +
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |  161 ++-
 .../fs/ozone/TestRootedOzoneFileSystemWithFSO.java |   12 +-
 .../hadoop/fs/ozone/contract/OzoneContract.java    |    5 +-
 .../TestContainerStateManagerIntegration.java      |    2 +-
 .../hdds/scm/pipeline/TestMultiRaftSetup.java      |    4 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       |    2 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   19 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |    7 +
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  150 ++-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |   16 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |    5 +
 .../hadoop/ozone/TestStandardOutputUtil.java       |   84 ++
 .../rpc/TestBlockOutputStreamWithFailures.java     |   20 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |   20 +-
 .../rpc/TestContainerStateMachineFailures.java     |   20 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      |   11 +
 .../rpc/TestOzoneClientMultipartUploadWithFSO.java |   31 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  140 +-
 .../hadoop/ozone/client/rpc/TestReadRetries.java   |   14 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  100 +-
 .../commandhandler/TestBlockDeletion.java          |   10 +-
 .../ozone/freon/TestHadoopDirTreeGenerator.java    |    5 +-
 .../freon/TestHadoopDirTreeGeneratorWithFSO.java   |    4 +-
 .../ozone/freon/TestHadoopNestedDirGenerator.java  |    3 +
 .../apache/hadoop/ozone/om/TestBucketOwner.java    |  244 ++++
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |   44 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      |   10 +-
 .../ozone/om/TestOMStartupWithBucketLayout.java    |  161 +++
 .../apache/hadoop/ozone/om/TestObjectStore.java    |  233 ++++
 .../hadoop/ozone/om/TestObjectStoreWithFSO.java    |   26 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |    6 +-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |   27 +-
 .../org/apache/hadoop/ozone/om/TestOmLDBCli.java   |   64 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |  273 ++--
 .../hadoop/ozone/om/TestOzoneManagerBootstrap.java |  184 ++-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  113 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |   22 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |   98 +-
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   65 +
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |   28 +-
 .../hadoop/ozone/om/TestRecursiveAclWithFSO.java   |    3 +-
 .../om/ratis/TestOzoneManagerRatisRequest.java     |   71 +
 .../TestOzoneHARatisLogParser.java}                |   52 +-
 .../ozone/recon/TestReconWithOzoneManager.java     |   19 +-
 .../ozone/recon/TestReconWithOzoneManagerFSO.java  |    6 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |  123 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |  141 +-
 hadoop-ozone/interface-client/pom.xml              |    4 +-
 .../src/main/proto/OMAdminProtocol.proto           |   65 +
 .../src/main/proto/OmClientProtocol.proto          |   59 +-
 .../interface-client/src/main/resources/proto.lock |  826 +++++++++++-
 hadoop-ozone/interface-storage/pom.xml             |    4 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |    6 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |    5 +-
 hadoop-ozone/ozone-manager/pom.xml                 |    4 +-
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |   26 +-
 .../hadoop/ozone/om/DirectoryDeletingService.java  |   21 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  779 +++++++----
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   14 +
 .../apache/hadoop/ozone/om/OMPolicyProvider.java   |    5 +-
 .../apache/hadoop/ozone/om/OMStarterInterface.java |    2 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   29 +-
 .../org/apache/hadoop/ozone/om/OzoneAclUtils.java  |  125 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 1374 ++++++--------------
 .../hadoop/ozone/om/OzoneManagerStarter.java       |   59 +-
 .../apache/hadoop/ozone/om/OzoneManagerUtils.java  |  180 +++
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |    7 +-
 .../apache/hadoop/ozone/om/TrashPolicyOzone.java   |    1 +
 .../org/apache/hadoop/ozone/om/VolumeManager.java  |   40 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |  300 -----
 .../apache/hadoop/ozone/om/ha/OMHANodeDetails.java |    6 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   42 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |  185 +--
 .../hadoop/ozone/om/request/OMClientRequest.java   |  114 +-
 .../ozone/om/request/OMKeyRequestFactory.java      |  139 ++
 .../om/request/bucket/OMBucketCreateRequest.java   |   12 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |   27 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |    6 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |   22 +-
 .../request/file/OMFileCreateRequestWithFSO.java   |    7 +-
 .../ozone/om/request/file/OMFileRequest.java       |  140 +-
 .../om/request/key/OMAllocateBlockRequest.java     |   38 +-
 .../request/key/OMAllocateBlockRequestWithFSO.java |    6 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  108 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |   35 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   39 +-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |    6 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |   88 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |    8 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |   30 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   12 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   73 +-
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |    5 +-
 .../ozone/om/request/key/OMKeysRenameRequest.java  |    9 +-
 .../om/request/key/OMPathsPurgeRequestWithFSO.java |    3 +-
 .../om/request/key/OMTrashRecoverRequest.java      |    2 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   58 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   10 +-
 .../om/request/key/acl/OMKeyAddAclRequest.java     |    5 +-
 .../request/key/acl/OMKeyAddAclRequestWithFSO.java |    5 +-
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  |    4 +-
 .../key/acl/OMKeyRemoveAclRequestWithFSO.java      |    6 +-
 .../om/request/key/acl/OMKeySetAclRequest.java     |    4 +-
 .../request/key/acl/OMKeySetAclRequestWithFSO.java |    5 +-
 .../S3InitiateMultipartUploadRequest.java          |   33 +-
 .../S3InitiateMultipartUploadRequestWithFSO.java   |    8 +-
 .../multipart/S3MultipartUploadAbortRequest.java   |   49 +-
 .../S3MultipartUploadAbortRequestWithFSO.java      |    6 +-
 .../S3MultipartUploadCommitPartRequest.java        |   38 +-
 .../S3MultipartUploadCommitPartRequestWithFSO.java |    7 +-
 .../S3MultipartUploadCompleteRequest.java          |   62 +-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |    6 +-
 .../ozone/om/request/upgrade/OMPrepareRequest.java |    9 +-
 .../hadoop/ozone/om/response/OMClientResponse.java |   26 +
 .../response/file/OMDirectoryCreateResponse.java   |    6 +-
 .../response/file/OMFileCreateResponseWithFSO.java |    5 +
 .../response/key/AbstractOMKeyDeleteResponse.java  |   11 +-
 .../om/response/key/OMAllocateBlockResponse.java   |    4 +-
 .../key/OMAllocateBlockResponseWithFSO.java        |    6 +
 .../ozone/om/response/key/OMKeyCommitResponse.java |   31 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   21 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   11 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |    5 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |    9 +-
 .../ozone/om/response/key/OMKeyRenameResponse.java |   25 +-
 .../response/key/OMKeyRenameResponseWithFSO.java   |   18 +-
 .../om/response/key/OMKeysDeleteResponse.java      |    3 +-
 .../om/response/key/OMKeysRenameResponse.java      |   11 +-
 .../om/response/key/OMOpenKeysDeleteRequest.java   |    5 +-
 .../om/response/key/OMOpenKeysDeleteResponse.java  |    3 +-
 .../response/key/OMPathsPurgeResponseWithFSO.java  |   11 +-
 .../om/response/key/OMTrashRecoverResponse.java    |   10 +-
 .../ozone/om/response/key/OmKeyResponse.java       |   71 +
 .../om/response/key/acl/OMKeyAclResponse.java      |    4 +-
 .../response/key/acl/OMKeyAclResponseWithFSO.java  |    4 +-
 .../S3InitiateMultipartUploadResponse.java         |    4 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |    4 +-
 .../S3MultipartUploadAbortResponseWithFSO.java     |    6 +
 .../S3MultipartUploadCommitPartResponse.java       |    4 +-
 ...S3MultipartUploadCommitPartResponseWithFSO.java |    6 +
 .../S3MultipartUploadCompleteResponse.java         |   38 +-
 .../protocolPB/OMAdminProtocolServerSideImpl.java  |   66 +
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   63 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   39 +-
 .../hadoop/ozone/protocolPB/RequestHandler.java    |    4 +-
 .../security/OzoneBlockTokenSecretManager.java     |    5 -
 .../hadoop/ozone/security/S3SecurityUtil.java      |   82 ++
 .../hadoop/ozone/om/TestBucketManagerImpl.java     |   63 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |    7 +-
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |    4 +-
 .../ozone/om/request/TestOMRequestUtils.java       |  139 +-
 .../ozone/om/request/bucket/TestBucketRequest.java |    1 -
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |    8 +-
 .../request/file/TestOMDirectoryCreateRequest.java |   53 +-
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |   94 +-
 .../om/request/file/TestOMFileCreateRequest.java   |    8 +-
 .../file/TestOMFileCreateRequestWithFSO.java       |   26 +-
 .../om/request/key/TestOMAllocateBlockRequest.java |    7 +-
 .../key/TestOMAllocateBlockRequestWithFSO.java     |   25 +-
 .../ozone/om/request/key/TestOMKeyAclRequest.java  |   40 +-
 .../om/request/key/TestOMKeyAclRequestWithFSO.java |   29 +-
 .../om/request/key/TestOMKeyCommitRequest.java     |  131 +-
 .../request/key/TestOMKeyCommitRequestWithFSO.java |   19 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |   70 +-
 .../request/key/TestOMKeyCreateRequestWithFSO.java |   18 +-
 .../om/request/key/TestOMKeyDeleteRequest.java     |   26 +-
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |   26 +-
 .../om/request/key/TestOMKeyRenameRequest.java     |    8 +-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   10 +-
 .../om/request/key/TestOMKeysDeleteRequest.java    |    4 +-
 .../om/request/key/TestOMKeysRenameRequest.java    |   20 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   |   10 +-
 .../TestS3InitiateMultipartUploadRequest.java      |   30 +-
 ...estS3InitiateMultipartUploadRequestWithFSO.java |   14 +-
 .../s3/multipart/TestS3MultipartRequest.java       |   21 +-
 .../TestS3MultipartUploadAbortRequest.java         |    5 +-
 .../TestS3MultipartUploadAbortRequestWithFSO.java  |    7 +-
 .../TestS3MultipartUploadCommitPartRequest.java    |   17 +-
 ...tS3MultipartUploadCommitPartRequestWithFSO.java |   19 +-
 .../TestS3MultipartUploadCompleteRequest.java      |   19 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |   28 +-
 .../ozone/om/response/TestCleanupTableInfo.java    |    2 +
 .../file/TestOMDirectoryCreateResponse.java        |    7 +-
 .../file/TestOMFileCreateResponseWithFSO.java      |   13 +-
 .../response/key/TestOMAllocateBlockResponse.java  |   12 +-
 .../key/TestOMAllocateBlockResponseWithFSO.java    |   14 +-
 .../om/response/key/TestOMKeyCommitResponse.java   |   46 +-
 .../key/TestOMKeyCommitResponseWithFSO.java        |   18 +-
 .../om/response/key/TestOMKeyCreateResponse.java   |   12 +-
 .../key/TestOMKeyCreateResponseWithFSO.java        |   19 +-
 .../om/response/key/TestOMKeyDeleteResponse.java   |   18 +-
 .../key/TestOMKeyDeleteResponseWithFSO.java        |   14 +-
 .../om/response/key/TestOMKeyRenameResponse.java   |   36 +-
 .../ozone/om/response/key/TestOMKeyResponse.java   |    8 +
 .../om/response/key/TestOMKeysDeleteResponse.java  |    9 +-
 .../om/response/key/TestOMKeysRenameResponse.java  |   16 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |   18 +-
 .../TestS3InitiateMultipartUploadResponse.java     |    3 +-
 ...stS3InitiateMultipartUploadResponseWithFSO.java |    9 +-
 .../s3/multipart/TestS3MultipartResponse.java      |   15 +-
 .../TestS3MultipartUploadAbortResponse.java        |   11 +-
 ...S3MultipartUploadCommitPartResponseWithFSO.java |   15 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  100 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |   21 +-
 hadoop-ozone/ozonefs-common/pom.xml                |    4 +-
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |   18 +-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |   39 +-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |    4 +-
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   67 +
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |    4 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |    4 +-
 hadoop-ozone/ozonefs-shaded/pom.xml                |    6 +-
 hadoop-ozone/ozonefs/pom.xml                       |    4 +-
 hadoop-ozone/pom.xml                               |    6 +-
 hadoop-ozone/recon-codegen/pom.xml                 |    2 +-
 .../recon/schema/ContainerSchemaDefinition.java    |    3 +-
 hadoop-ozone/recon/pom.xml                         |    2 +-
 .../hadoop/ozone/recon/api/ContainerEndpoint.java  |    9 +-
 .../codec/ContainerReplicaHistoryListCodec.java    |   45 +-
 .../ozone/recon/fsck/ContainerHealthStatus.java    |   17 +-
 .../ozone/recon/fsck/ContainerHealthTask.java      |    3 +
 .../persistence/ContainerHealthSchemaManager.java  |   12 +-
 .../ozone/recon/persistence/ContainerHistory.java  |    9 +-
 .../ozone/recon/scm/ContainerReplicaHistory.java   |   27 +-
 .../recon/scm/ContainerReplicaHistoryList.java     |   32 +-
 .../hadoop/ozone/recon/scm/PipelineSyncTask.java   |    3 +
 .../ozone/recon/scm/ReconContainerManager.java     |   21 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   10 +-
 .../scm/ReconStorageContainerManagerFacade.java    |    5 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |    4 +-
 .../impl/ReconContainerMetadataManagerImpl.java    |    4 +-
 .../ozone/recon/spi/impl/ReconDBDefinition.java    |   13 +-
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  |    8 +-
 .../ozone/recon/tasks/FileSizeCountTask.java       |    8 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |   25 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |   34 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |    7 +-
 .../ozone/recon/api/TestNSSummaryEndpoint.java     |    3 -
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   16 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   25 +-
 .../ozone/recon/scm/TestReconContainerManager.java |   13 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  |    5 +
 .../impl/TestOzoneManagerServiceProviderImpl.java  |   17 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |    9 +-
 .../ozone/recon/tasks/TestFileSizeCountTask.java   |    9 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   16 +-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  |   30 +-
 hadoop-ozone/s3gateway/pom.xml                     |    4 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  103 +-
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |    5 +
 .../ozone/s3/endpoint/CopyObjectResponse.java      |    2 +-
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |   29 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |   59 +-
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |    5 +
 .../hadoop/ozone/s3/io/S3WrapperInputStream.java   |   84 --
 .../apache/hadoop/ozone/s3/util/S3StorageType.java |   22 +-
 .../hadoop/ozone/s3/TestOzoneClientProducer.java   |    4 +-
 .../s3/endpoint/TestAbortMultipartUpload.java      |    2 +
 .../s3/endpoint/TestInitiateMultipartUpload.java   |    2 +
 .../hadoop/ozone/s3/endpoint/TestListParts.java    |    2 +
 .../s3/endpoint/TestMultipartUploadComplete.java   |    2 +
 .../s3/endpoint/TestMultipartUploadWithCopy.java   |  221 +++-
 .../hadoop/ozone/s3/endpoint/TestObjectDelete.java |    2 +
 .../hadoop/ozone/s3/endpoint/TestObjectGet.java    |    2 +
 .../hadoop/ozone/s3/endpoint/TestObjectHead.java   |    2 +
 .../hadoop/ozone/s3/endpoint/TestObjectPut.java    |    2 +
 .../hadoop/ozone/s3/endpoint/TestPartUpload.java   |    2 +
 .../ozone/s3/endpoint/TestPermissionCheck.java     |    4 +
 hadoop-ozone/tools/pom.xml                         |    4 +-
 .../ozone/admin/nssummary/DiskUsageSubCommand.java |    2 +-
 .../admin/nssummary/FileSizeDistSubCommand.java    |    2 +-
 .../ozone/admin/nssummary/NSSummaryAdmin.java      |   36 +-
 .../ozone/admin/nssummary/NSSummaryCLIUtils.java   |    8 +-
 .../admin/nssummary/QuotaUsageSubCommand.java      |    2 +-
 .../ozone/admin/nssummary/SummarySubCommand.java   |    2 +-
 .../ozone/admin/om/FinalizeUpgradeSubCommand.java  |    5 +-
 .../admin/scm/FinalizeScmUpgradeSubcommand.java    |    5 +-
 .../hadoop/ozone/debug/DBDefinitionFactory.java    |   17 +-
 .../org/apache/hadoop/ozone/debug/DBScanner.java   |   80 +-
 .../apache/hadoop/ozone/debug/PrefixParser.java    |   26 +-
 .../ozone/debug/container/ContainerCommands.java   |   16 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   10 +-
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java |    8 +-
 .../hadoop/ozone/freon/DatanodeChunkValidator.java |  155 +--
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |    3 +-
 .../hadoop/ozone/freon/RandomKeyGenerator.java     |   54 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java |  908 +++++++++++++
 .../containergenerator/GeneratorDatanode.java      |   23 +-
 .../apache/hadoop/ozone/fsck/ContainerMapper.java  |    7 +-
 .../ozone/genesis/BenchMarkContainerStateMap.java  |    2 +-
 .../ozone/genesis/BenchMarkOMKeyAllocation.java    |  137 --
 .../org/apache/hadoop/ozone/genesis/Genesis.java   |    2 +-
 .../hadoop/ozone/segmentparser/RatisLogParser.java |    4 +-
 .../ozone/segmentparser/SCMRatisLogParser.java}    |   25 +-
 .../org/apache/hadoop/ozone/shell/Handler.java     |   17 +
 .../ozone/shell/bucket/CreateBucketHandler.java    |   24 +-
 .../ozone/shell/bucket/ListBucketHandler.java      |   10 +-
 .../hadoop/ozone/shell/keys/CopyKeyHandler.java    |   18 +-
 .../hadoop/ozone/shell/keys/ListKeyHandler.java    |    8 +-
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     |   18 +-
 .../ozone/shell/volume/ListVolumeHandler.java      |    9 +-
 .../ozone/debug/TestDBDefinitionFactory.java       |    7 +-
 pom.xml                                            |   79 +-
 578 files changed, 17668 insertions(+), 7767 deletions(-)

diff --cc 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 616a5dc,8d6ea10..ab1c52e
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@@ -48,11 -47,8 +48,12 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
  import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
  import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 +import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserList;
  import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+ import org.apache.hadoop.ozone.om.protocol.S3Auth;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
  import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
  import org.apache.hadoop.ozone.security.acl.OzoneObj;
diff --cc 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 830a707,9ca8693..ad0c13f
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@@ -106,14 -104,12 +106,16 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 +import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserList;
  import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+ import org.apache.hadoop.ozone.om.protocol.S3Auth;
  import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
  import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerClientProtocol;
  import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
  import org.apache.hadoop.ozone.security.GDPRSymmetricKey;
  import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 97fbb34d,2dd5113..d671af2
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@@ -277,25 -285,14 +285,37 @@@ public final class OMConfigKeys 
    public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
  
    /**
+    * Configuration properties for OMAdminProtcol service.
+    */
+   public static final String OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_KEY =
+       "ozone.om.admin.protocol.max.retries";
+   public static final int OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_DEFAULT = 20;
+   public static final String OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_KEY 
=
+       "ozone.om.admin.protocol.wait.between.retries";
+   public static final long 
OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
+       = 1000;
+ 
++  /**
 +   * Temporary configuration properties for Ranger REST use in multitenancy.
 +   */
 +  public static final String OZONE_RANGER_OM_IGNORE_SERVER_CERT =
 +      "ozone.om.ranger.ignore.cert";
 +  public static final boolean OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT =
 +      true;
 +  public static final String OZONE_RANGER_OM_CONNECTION_TIMEOUT =
 +      "ozone.om.ranger.connection.timeout";
 +  public static final String OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT = 
"5s";
 +  public static final String OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT =
 +      "ozone.om.ranger.connection.request.timeout";
 +  public static final String
 +      OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
 +  public static final String OZONE_OM_RANGER_HTTPS_ADMIN_API_USER =
 +      "ozone.om.ranger.https.admin.api.user";
 +  public static final String OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD =
 +      "ozone.om.ranger.https.admin.api.passwd";
 +  public static final String OZONE_RANGER_HTTPS_ADDRESS_KEY =
 +      "ozone.om.ranger.https-address";
 +  public static final String OZONE_RANGER_SERVICE =
 +      "ozone.om.ranger.service";
++
  }
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
index b8f5b4e,0000000..a7e3487
mode 100644,000000..100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
@@@ -1,485 -1,0 +1,485 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.ozone.om.multitenant;
 +
 +import static java.net.HttpURLConnection.HTTP_OK;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT;
 +import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.OutputStream;
 +import java.net.URL;
 +import java.nio.charset.StandardCharsets;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +
 +import javax.net.ssl.HttpsURLConnection;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.TrustManager;
 +import javax.net.ssl.X509TrustManager;
 +
 +import com.google.gson.JsonArray;
 +import com.google.gson.JsonObject;
 +import com.google.gson.JsonParseException;
 +import com.google.gson.JsonParser;
 +import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.conf.Configuration;
- import org.apache.commons.net.util.Base64;
++import org.apache.kerby.util.Base64;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.security.acl.IOzoneObj;
 +import org.apache.hadoop.ozone.security.acl.RequestContext;
 +import org.apache.http.auth.BasicUserPrincipal;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Implements MultiTenantAccessAuthorizer for Apache Ranger.
 + */
 +public class MultiTenantAccessAuthorizerRangerPlugin implements
 +    MultiTenantAccessAuthorizer {
 +  public static final Logger LOG = LoggerFactory
 +      .getLogger(MultiTenantAccessAuthorizerRangerPlugin.class);
 +
 +  private OzoneConfiguration conf;
 +  private boolean ignoreServerCert = false;
 +  private int connectionTimeout;
 +  private int connectionRequestTimeout;
 +  private String authHeaderValue;
 +  private String rangerHttpsAddress;
 +
 +  @Override
 +  public void init(Configuration configuration) throws IOException {
 +    conf = new OzoneConfiguration(configuration);
 +    rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
 +    initializeRangerConnection();
 +  }
 +
 +  private void initializeRangerConnection() {
 +    setupRangerConnectionConfig();
 +    if (ignoreServerCert) {
 +      setupRangerIgnoreServerCertificate();
 +    }
 +    setupRangerConnectionAuthHeader();
 +  }
 +
 +  private void setupRangerConnectionConfig() {
 +    connectionTimeout = (int) conf.getTimeDuration(
 +        OZONE_RANGER_OM_CONNECTION_TIMEOUT,
 +        conf.get(
 +            OZONE_RANGER_OM_CONNECTION_TIMEOUT,
 +            OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT),
 +        TimeUnit.MILLISECONDS);
 +    connectionRequestTimeout = (int)conf.getTimeDuration(
 +        OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
 +        conf.get(
 +            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
 +            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
 +        TimeUnit.MILLISECONDS
 +    );
 +    ignoreServerCert = (boolean) conf.getBoolean(
 +        OZONE_RANGER_OM_IGNORE_SERVER_CERT,
 +            OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT);
 +  }
 +
 +  private void setupRangerIgnoreServerCertificate() {
 +    // Create a trust manager that does not validate certificate chains
 +    TrustManager[] trustAllCerts = new TrustManager[]{
 +        new X509TrustManager() {
 +          public java.security.cert.X509Certificate[] getAcceptedIssuers() {
 +            return null;
 +          }
 +          public void checkClientTrusted(
 +              java.security.cert.X509Certificate[] certs, String authType) {
 +          }
 +          public void checkServerTrusted(
 +              java.security.cert.X509Certificate[] certs, String authType) {
 +          }
 +        }
 +    };
 +
 +    try {
 +      SSLContext sc = SSLContext.getInstance("SSL");
 +      sc.init(null, trustAllCerts, new java.security.SecureRandom());
 +      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
 +    } catch (Exception e) {
 +      LOG.info("Setting DefaultSSLSocketFactory failed.");
 +    }
 +  }
 +
 +  private void setupRangerConnectionAuthHeader() {
 +    String userName = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
 +    String passwd = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
 +    String auth = userName + ":" + passwd;
 +    byte[] encodedAuth =
 +        Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
 +    authHeaderValue = "Basic " +
 +        new String(encodedAuth, StandardCharsets.UTF_8);
 +  }
 +
 +
 +  @Override
 +  public void shutdown() throws Exception {
 +    // TBD
 +  }
 +
 +  @Override
 +  public void grantAccess(BucketNameSpace bucketNameSpace,
 +                          BasicUserPrincipal user, ACLType aclType) {
 +    // TBD
 +  }
 +
 +  @Override
 +  public void revokeAccess(BucketNameSpace bucketNameSpace,
 +                           BasicUserPrincipal user, ACLType aclType) {
 +    // TBD
 +  }
 +
 +  @Override
 +  public void grantAccess(AccountNameSpace accountNameSpace,
 +                          BasicUserPrincipal user, ACLType aclType) {
 +    // TBD
 +  }
 +
 +  @Override
 +  public void revokeAccess(AccountNameSpace accountNameSpace,
 +                           BasicUserPrincipal user, ACLType aclType) {
 +    // TBD
 +  }
 +
 +  public List<Pair<BucketNameSpace, ACLType>>
 +      getAllBucketNameSpaceAccesses(BasicUserPrincipal user) {
 +    // TBD
 +    return null;
 +  }
 +
 +  @Override
 +  public boolean checkAccess(BucketNameSpace bucketNameSpace,
 +                             BasicUserPrincipal user) {
 +    // TBD
 +    return true;
 +  }
 +
 +  @Override
 +  public boolean checkAccess(AccountNameSpace accountNameSpace,
 +                             BasicUserPrincipal user) {
 +    // TBD
 +    return true;
 +  }
 +
 +  @Override
 +  public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
 +      throws OMException {
 +    // TBD
 +    return true;
 +  }
 +
 +  @Override
 +  public String getRole(OzoneTenantRolePrincipal principal) throws 
IOException {
 +
 +    String endpointUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT +
 +            principal.getName();
 +
 +    HttpsURLConnection conn = makeHttpsGetCall(endpointUrl, "GET", false);
 +    return getResponseData(conn);
 +  }
 +
 +  @Override
 +  public String getUserId(BasicUserPrincipal principal) throws IOException {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT +
 +        principal.getName();
 +
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl,
 +        "GET", false);
 +    String response = getResponseData(conn);
 +    String userIDCreated = null;
 +    try {
 +      JsonObject jResonse = new 
JsonParser().parse(response).getAsJsonObject();
 +      JsonArray userinfo = jResonse.get("vXUsers").getAsJsonArray();
 +      int numIndex = userinfo.size();
 +      for (int i = 0; i < numIndex; ++i) {
 +        if (userinfo.get(i).getAsJsonObject().get("name").getAsString()
 +            .equals(principal.getName())) {
 +          userIDCreated =
 +              userinfo.get(i).getAsJsonObject().get("id").getAsString();
 +          break;
 +        }
 +      }
 +      LOG.debug("User ID is: {}", userIDCreated);
 +    } catch (JsonParseException e) {
 +      e.printStackTrace();
 +      throw e;
 +    }
 +    return userIDCreated;
 +  }
 +
 +  /**
 +   * Update the exising role details and push the changes to Ranger.
 +   *
 +   * @param principal contains user name, must be an existing user in Ranger.
 +   * @param existingRole An existing role's JSON response String from Ranger.
 +   * @param isAdmin Make it delegated admin of the role.
 +   * @return roleId (not useful for now)
 +   * @throws IOException
 +   */
 +  public String assignUser(BasicUserPrincipal principal, String existingRole,
 +      boolean isAdmin) throws IOException {
 +
 +    JsonObject roleObj = new 
JsonParser().parse(existingRole).getAsJsonObject();
 +    // Parse Json
 +    final String roleId = roleObj.get("id").getAsString();
 +    LOG.debug("Got roleId: {}", roleId);
 +
 +    JsonArray usersArray = roleObj.getAsJsonArray("users");
 +    JsonObject newUserEntry = new JsonObject();
 +    newUserEntry.addProperty("name", principal.getName());
 +    newUserEntry.addProperty("isAdmin", isAdmin);
 +    usersArray.add(newUserEntry);
 +    // Update Json array
 +    roleObj.add("users", usersArray);
 +
 +    LOG.debug("Updated: {}", roleObj);
 +
 +    final String endpointUrl = rangerHttpsAddress +
 +        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
 +    final String jsonData = roleObj.toString();
 +
 +    HttpsURLConnection conn =
 +        makeHttpCall(endpointUrl, jsonData, "PUT", false);
 +    if (conn.getResponseCode() != HTTP_OK) {
 +      throw new IOException("Ranger REST API failure: " + 
conn.getResponseCode()
 +          + " " + conn.getResponseMessage()
 +          + ". Error updating Ranger role.");
 +    }
 +    String resp = getResponseData(conn);
 +    String returnedRoleId;
 +    try {
 +      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
 +      returnedRoleId = jObject.get("id").getAsString();
 +      LOG.debug("Ranger returns roleId: {}", roleId);
 +    } catch (JsonParseException e) {
 +      e.printStackTrace();
 +      throw e;
 +    }
 +    return returnedRoleId;
 +  }
 +
 +  private String getCreateRoleJsonStr(String roleName, String adminRoleName) {
 +    return "{"
 +        + "  \"name\":\"" + roleName + "\","
 +        + "  \"description\":\"Role created by Ozone for Multi-Tenancy\""
 +        + (adminRoleName == null ? "" : ", \"roles\":"
 +        + "[{\"name\":\"" + adminRoleName + "\",\"isAdmin\": true}]")
 +        + "}";
 +  }
 +
 +  public String createRole(OzoneTenantRolePrincipal role, String 
adminRoleName)
 +      throws IOException {
 +
 +    String endpointUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
 +
 +    String jsonData = getCreateRoleJsonStr(role.toString(), adminRoleName);
 +
 +    final HttpsURLConnection conn = makeHttpCall(endpointUrl,
 +        jsonData, "POST", false);
 +    if (conn.getResponseCode() != HTTP_OK) {
 +      throw new IOException("Ranger REST API failure: " + 
conn.getResponseCode()
 +          + " " + conn.getResponseMessage()
 +          + ". Role name '" + role + "' likely already exists in Ranger");
 +    }
 +    String roleInfo = getResponseData(conn);
 +    String roleId;
 +    try {
 +      JsonObject jObject = new JsonParser().parse(roleInfo).getAsJsonObject();
 +      roleId = jObject.get("id").getAsString();
 +      LOG.debug("Ranger returned roleId: {}", roleId);
 +    } catch (JsonParseException e) {
 +      e.printStackTrace();
 +      throw e;
 +    }
 +    return roleId;
 +  }
 +
 +  public String createAccessPolicy(AccessPolicy policy) throws Exception {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + 
OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
 +
 +    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl,
 +        policy.serializePolicyToJsonString(),
 +        "POST", false);
 +    String policyInfo = getResponseData(conn);
 +    String policyID;
 +    try {
 +      JsonObject jObject = new 
JsonParser().parse(policyInfo).getAsJsonObject();
 +      policyID = jObject.get("id").getAsString();
 +      LOG.debug("policyID is: {}", policyID);
 +    } catch (JsonParseException e) {
 +      e.printStackTrace();
 +      throw e;
 +    }
 +    return policyID;
 +  }
 +
 +  public AccessPolicy getAccessPolicyByName(String policyName)
 +      throws Exception {
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT +
 +        policyName;
 +
 +    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl,
 +        "GET", false);
 +    String policyInfo = getResponseData(conn);
 +    JsonArray jArry = new JsonParser().parse(policyInfo).getAsJsonArray();
 +    JsonObject jsonObject = jArry.get(0).getAsJsonObject();
 +    AccessPolicy policy = new RangerAccessPolicy(policyName);
 +    policy.deserializePolicyFromJsonString(jsonObject);
 +    return policy;
 +  }
 +
 +  public void deleteUser(String userId) throws IOException {
 +
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT
 +            + userId + "?forceDelete=true";
 +
 +    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
 +        "DELETE", false);
 +    int respnseCode = conn.getResponseCode();
 +    if (respnseCode != 200 && respnseCode != 204) {
 +      throw new IOException("Couldnt delete user " + userId);
 +    }
 +  }
 +
 +  public void deleteRole(String groupId) throws IOException {
 +
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT
 +            + groupId + "?forceDelete=true";
 +
 +    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
 +        "DELETE", false);
 +    int respnseCode = conn.getResponseCode();
 +    if (respnseCode != 200 && respnseCode != 204) {
 +      throw new IOException("Couldnt delete group " + groupId);
 +    }
 +  }
 +
 +  @Override
 +  public void deletePolicybyName(String policyName) throws Exception {
 +    AccessPolicy policy = getAccessPolicyByName(policyName);
 +    String  policyID = policy.getPolicyID();
 +    LOG.debug("policyID is: {}", policyID);
 +    deletePolicybyId(policyID);
 +  }
 +
 +  public void deletePolicybyId(String policyId) throws IOException {
 +
 +    String rangerAdminUrl =
 +        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT
 +            + policyId + "?forceDelete=true";
 +    try {
 +      HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
 +          "DELETE", false);
 +      int respnseCode = conn.getResponseCode();
 +      if (respnseCode != 200 && respnseCode != 204) {
 +        throw new IOException("Couldnt delete policy " + policyId);
 +      }
 +    } catch (Exception e) {
 +      throw new IOException("Couldnt delete policy " + policyId, e);
 +    }
 +  }
 +
 +  private String getResponseData(HttpsURLConnection urlConnection)
 +      throws IOException {
 +    StringBuilder response = new StringBuilder();
 +    try (BufferedReader br = new BufferedReader(
 +        new InputStreamReader(urlConnection.getInputStream(),
 +            StandardCharsets.UTF_8))) {
 +      String responseLine;
 +      while ((responseLine = br.readLine()) != null) {
 +        response.append(responseLine.trim());
 +      }
 +      LOG.debug("Got response: {}", response);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      throw e;
 +    }
 +    return response.toString();
 +  }
 +
 +  private HttpsURLConnection makeHttpCall(String urlString,
 +                                              String jsonInputString,
 +                                              String method, boolean isSpnego)
 +      throws IOException {
 +
 +    URL url = new URL(urlString);
 +    HttpsURLConnection urlConnection = 
(HttpsURLConnection)url.openConnection();
 +    urlConnection.setRequestMethod(method);
 +    urlConnection.setConnectTimeout(connectionTimeout);
 +    urlConnection.setReadTimeout(connectionRequestTimeout);
 +    urlConnection.setRequestProperty("Accept", "application/json");
 +    urlConnection.setRequestProperty("Authorization", authHeaderValue);
 +
 +    if ((jsonInputString != null) && !jsonInputString.isEmpty()) {
 +      urlConnection.setDoOutput(true);
 +      urlConnection.setRequestProperty("Content-Type", "application/json;");
 +      try (OutputStream os = urlConnection.getOutputStream()) {
 +        byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
 +        os.write(input, 0, input.length);
 +        os.flush();
 +      }
 +    }
 +
 +    return urlConnection;
 +  }
 +
 +  private HttpsURLConnection makeHttpsGetCall(String urlString,
 +      String method, boolean isSpnego) throws IOException {
 +
 +    URL url = new URL(urlString);
 +    HttpsURLConnection urlConnection = 
(HttpsURLConnection)url.openConnection();
 +    urlConnection.setRequestMethod(method);
 +    urlConnection.setConnectTimeout(connectionTimeout);
 +    urlConnection.setReadTimeout(connectionRequestTimeout);
 +    urlConnection.setRequestProperty("Accept", "application/json");
 +    urlConnection.setRequestProperty("Authorization", authHeaderValue);
 +
 +    return urlConnection;
 +  }
 +}
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 0c8c833,7bc67da..b9bf572
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@@ -451,109 -520,24 +525,145 @@@ public interface OzoneManagerProtoco
     * @return S3SecretValue
     * @throws IOException
     */
-   S3SecretValue getS3Secret(String kerberosID) throws IOException;
+   default S3SecretValue getS3Secret(String kerberosID) throws IOException {
+     throw new UnsupportedOperationException("OzoneManager does not require " +
+         "this to be implemented, as write requests use a new approach.");
+   }
  
 +  /**
 +   * Gets s3Secret for given kerberos user.
 +   * @param kerberosID
 +   * @param createIfNotExist
 +   * @return S3SecretValue
 +   * @throws IOException
 +   */
-   S3SecretValue getS3Secret(String kerberosID, boolean createIfNotExist)
-           throws IOException;
++  default S3SecretValue getS3Secret(String kerberosID, boolean 
createIfNotExist)
++      throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  };
 +
 +  /**
 +   * Set secret key for accessId.
 +   * @param accessId
 +   * @param secretKey
 +   * @return S3SecretValue
 +   * @throws IOException
 +   */
-   S3SecretValue setS3Secret(String accessId, String secretKey)
-       throws IOException;
++  default S3SecretValue setS3Secret(String accessId, String secretKey)
++      throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
  
    /**
     * Revokes s3Secret of given kerberos user.
     * @param kerberosID
     * @throws IOException
     */
-   void revokeS3Secret(String kerberosID) throws IOException;
+   default void revokeS3Secret(String kerberosID) throws IOException {
+     throw new UnsupportedOperationException("OzoneManager does not require " +
+         "this to be implemented, as write requests use a new approach.");
+   }
+ 
  
    /**
 +   * Create a tenant.
 +   * @param omTenantArgs OmTenantArgs
 +   * @throws IOException
 +   */
-   void createTenant(OmTenantArgs omTenantArgs) throws IOException;
++  default void createTenant(OmTenantArgs omTenantArgs) throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  /**
 +   * Delete a tenant.
 +   * @param tenantId tenant name.
 +   * @return DeleteTenantResponse
 +   * @throws IOException
 +   */
-   DeleteTenantResponse deleteTenant(String tenantId) throws IOException;
++  default DeleteTenantResponse deleteTenant(String tenantId)
++      throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  /**
 +   * Assign user to a tenant.
 +   * @param username user name to be assigned.
 +   * @param tenantName tenant name.
 +   * @param accessId access ID.
 +   * @return S3SecretValue
 +   * @throws IOException
 +   */
-   S3SecretValue tenantAssignUserAccessId(String username, String tenantName,
-       String accessId) throws IOException;
++  default S3SecretValue tenantAssignUserAccessId(String username,
++                                                 String tenantName,
++                                                 String accessId)
++      throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  OmVolumeArgs getS3Volume(String accessID) throws IOException;
 +
-   // TODO: modify, delete
 +  /**
 +   * Revoke user accessId to a tenant.
 +   * @param accessId accessId to be revoked.
 +   * @throws IOException
 +   */
-   void tenantRevokeUserAccessId(String accessId) throws IOException;
++  default void tenantRevokeUserAccessId(String accessId) throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  /**
-    * Assign admin role to an accessId in a tenant.
++   * Assign admin role to a user identified by an accessId in a tenant.
 +   * @param accessId access ID.
 +   * @param tenantName tenant name.
 +   * @param delegated true if making delegated admin.
 +   * @throws IOException
 +   */
-   void tenantAssignAdmin(String accessId, String tenantName,
-       boolean delegated) throws IOException;
++  default void tenantAssignAdmin(String accessId,
++                                 String tenantName,
++                                 boolean delegated)
++      throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  /**
 +   * Revoke admin role of an accessId in a tenant.
 +   * @param accessId access ID.
 +   * @param tenantName tenant name.
 +   * @throws IOException
 +   */
-   void tenantRevokeAdmin(String accessId, String tenantName) throws 
IOException;
++  default void tenantRevokeAdmin(String accessId,
++                                 String tenantName) throws IOException {
++    throw new UnsupportedOperationException("OzoneManager does not require " +
++        "this to be implemented, as write requests use a new approach");
++  }
 +
 +  /**
 +   * Get tenant info for a user.
 +   * @param userPrincipal Kerberos principal of a user.
 +   * @return TenantUserInfo
 +   * @throws IOException
 +   */
 +  TenantUserInfoValue tenantGetUserInfo(String userPrincipal)
 +      throws IOException;
 +
 +  TenantUserList listUsersInTenant(String tenantName, String prefix)
 +      throws IOException;
 +
 +  /**
 +   * List tenants.
 +   * @return TenantInfoList
 +   * @throws IOException
 +   */
 +  TenantInfoList listTenant() throws IOException;
 +
 +  /**
     * OzoneFS api to get file status for an entry.
     *
     * @param keyArgs Key args
diff --cc 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index b861b94,b2c367f..9e0f789
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@@ -57,10 -56,7 +57,10 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 +import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserList;
- import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+ import org.apache.hadoop.ozone.om.protocol.S3Auth;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 2ee7710,8462caa..21f8f6f
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -65,12 -65,12 +65,16 @@@ public class TestOzoneConfigurationFiel
          ".duration"); // Deprecated config
      configurationPropsToSkipCompare
          .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
+     configurationPropsToSkipCompare
+         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY);
+     configurationPropsToSkipCompare
+         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION);
      // This property is tested in TestHttpServer2 instead
      xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
 +
 +    // TODO: Remove this once ranger configs are finalized in HDDS-5836
 +    configurationPrefixToSkipCompare.add("ozone.om.ranger");
 +
      addPropertiesNotInXml();
    }
  
diff --cc hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index de4a497,1dd922c..5cfb56c
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@@ -102,22 -102,6 +102,22 @@@ enum Type 
    RevokeS3Secret = 93;
  
    PurgePaths = 94;
 +
-   CreateTenant = 95;
-   DeleteTenant = 97;  // TODO: Renumber when rebasing
-   ListTenant = 103;
++  CreateTenant = 96;
++  DeleteTenant = 97;
++  ListTenant = 98;
 +
-   TenantGetUserInfo = 98;
-   TenantAssignUserAccessId = 99;
-   TenantRevokeUserAccessId = 100;
++  TenantGetUserInfo = 99;
++  TenantAssignUserAccessId = 100;
++  TenantRevokeUserAccessId = 101;
 +
-   TenantAssignAdmin = 101;
-   TenantRevokeAdmin = 102;
++  TenantAssignAdmin = 102;
++  TenantRevokeAdmin = 103;
 +
 +  GetS3Volume = 104;
 +  TenantListUser = 105;
 +
 +  SetS3Secret = 106;
  }
  
  message OMRequest {
@@@ -201,21 -185,7 +201,23 @@@
  
    optional PurgePathsRequest                purgePathsRequest              = 
94;
  
-   optional CreateTenantRequest              CreateTenantRequest            = 
95;
+   optional S3Authentication                 s3Authentication               = 
95;
++
++  optional CreateTenantRequest              CreateTenantRequest            = 
96;
 +  optional DeleteTenantRequest              DeleteTenantRequest            = 
97;
-   optional ListTenantRequest                ListTenantRequest              = 
103;  // TODO: Renumber this when rebasing
++  optional ListTenantRequest                ListTenantRequest              = 
98;
 +
-   optional TenantGetUserInfoRequest         TenantGetUserInfoRequest       = 
98;
-   optional TenantAssignUserAccessIdRequest  TenantAssignUserAccessIdRequest= 
99;
-   optional TenantRevokeUserAccessIdRequest  TenantRevokeUserAccessIdRequest= 
100;
++  optional TenantGetUserInfoRequest         TenantGetUserInfoRequest       = 
99;
++  optional TenantAssignUserAccessIdRequest  TenantAssignUserAccessIdRequest= 
100;
++  optional TenantRevokeUserAccessIdRequest  TenantRevokeUserAccessIdRequest= 
101;
 +
-   optional TenantAssignAdminRequest         TenantAssignAdminRequest       = 
101;
-   optional TenantRevokeAdminRequest         TenantRevokeAdminRequest       = 
102;
++  optional TenantAssignAdminRequest         TenantAssignAdminRequest       = 
102;
++  optional TenantRevokeAdminRequest         TenantRevokeAdminRequest       = 
103;
 +
 +  optional GetS3VolumeRequest               getS3VolumeRequest             = 
104;
 +  optional TenantListUserRequest            tenantListUserRequest          = 
105;
 +
 +  optional SetS3SecretRequest               SetS3SecretRequest             = 
106;
  }
  
  message OMResponse {
@@@ -292,23 -262,6 +294,23 @@@
    optional ListTrashResponse                  listTrashResponse            = 
91;
    optional RecoverTrashResponse               RecoverTrashResponse         = 
92;
    optional PurgePathsResponse                 purgePathsResponse           = 
93;
 +
-   // Skipped 94 to align with OMRequest
-   optional CreateTenantResponse              CreateTenantResponse          = 
95;
++  // Skipped 94/95 to align with OMRequest
++  optional CreateTenantResponse              CreateTenantResponse          = 
96;
 +  optional DeleteTenantResponse              DeleteTenantResponse          = 
97;
-   optional ListTenantResponse                ListTenantResponse            = 
103;  // TODO: Renumber this when rebasing
++  optional ListTenantResponse                ListTenantResponse            = 
98;
 +
-   optional TenantGetUserInfoResponse         TenantGetUserInfoResponse     = 
98;
-   optional TenantAssignUserAccessIdResponse  
TenantAssignUserAccessIdResponse= 99;
-   optional TenantRevokeUserAccessIdResponse  
TenantRevokeUserAccessIdResponse= 100;
++  optional TenantGetUserInfoResponse         TenantGetUserInfoResponse     = 
99;
++  optional TenantAssignUserAccessIdResponse  
TenantAssignUserAccessIdResponse= 100;
++  optional TenantRevokeUserAccessIdResponse  
TenantRevokeUserAccessIdResponse= 101;
 +
-   optional TenantAssignAdminResponse         TenantAssignAdminResponse     = 
101;
-   optional TenantRevokeAdminResponse         TenantRevokeAdminResponse     = 
102;
++  optional TenantAssignAdminResponse         TenantAssignAdminResponse     = 
102;
++  optional TenantRevokeAdminResponse         TenantRevokeAdminResponse     = 
103;
 +
 +  optional GetS3VolumeResponse               getS3VolumeResponse           = 
104;
 +  optional TenantListUserResponse            tenantListUserResponse        = 
105;
 +
 +  optional SetS3SecretResponse               SetS3SecretResponse           = 
106;
  }
  
  enum Status {
@@@ -1541,9 -1347,16 +1545,18 @@@ message UpdateGetS3SecretRequest 
      required string awsSecret = 2;
  }
  
 +// MARK
 +
  /**
+   This will be used by OM to authenticate S3 gateway requests on a per 
request basis.
+ */
+ message S3Authentication {
+     optional string stringToSign = 1;
+     optional string signature = 2;
+     optional string accessId = 3;
+ }
+ 
+ /**
   The OM service that takes care of Ozone namespace.
  */
  service OzoneManagerService {
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 899749a,8077d07..0e07b64
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@@ -66,7 -67,7 +67,10 @@@ import org.apache.hadoop.hdds.protocol.
  import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
  import org.apache.hadoop.hdds.scm.ScmInfo;
  import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
- import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
++import org.apache.hadoop.hdds.utils.db.Table;
++import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
++import org.apache.hadoop.hdds.utils.db.TableIterator;
+ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
  import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
  import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
  import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@@ -114,34 -112,18 +115,24 @@@ import org.apache.hadoop.ozone.om.excep
  import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
  import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
  import org.apache.hadoop.ozone.om.helpers.DBUpdates;
- import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
  import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 +import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 +import org.apache.hadoop.ozone.om.helpers.OmDBKerberosPrincipalInfo;
 +import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
- import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
  import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
  import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
- import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
- import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
- import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
- import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
- import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
  import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
  import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
- import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
- import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
  import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
- import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
  import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
  import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
- import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
  import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 +import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 +import org.apache.hadoop.ozone.om.helpers.TenantUserList;
  import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
+ import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
  import 
org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolClientSideImpl;
  import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
  import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolPB;
@@@ -159,12 -144,10 +153,12 @@@ import org.apache.hadoop.ozone.protocol
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
- import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Authentication;
  import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAccessIdInfo;
 +import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantInfo;
  import 
org.apache.hadoop.ozone.protocolPB.OMInterServiceProtocolServerSideImpl;
+ import org.apache.hadoop.ozone.protocolPB.OMAdminProtocolServerSideImpl;
  import 
org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
  import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
  import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
@@@ -257,10 -240,8 +251,10 @@@ import static org.apache.hadoop.ozone.o
  import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
  import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
  import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 +import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_ACCESSID;
- import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 +import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERMISSION_DENIED;
  import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
  import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
  import static 
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
  import static 
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.getRaftGroupIdFromOmServiceId;
@@@ -3060,359 -2857,7 +2883,171 @@@ public final class OzoneManager extend
      return upgradeFinalizer.reportStatus(upgradeClientID, takeover);
    }
  
-   @Override
-   /**
-    * {@inheritDoc}
-    */
-   public S3SecretValue getS3Secret(String kerberosID) throws IOException {
-     UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
- 
-     // Check whether user name passed is matching with the current user or 
not.
-     if (!user.getUserName().equals(kerberosID)) {
-       throw new OMException("User mismatch. Requested user name is " +
-           "mismatched " + kerberosID + ", with current user " +
-           user.getUserName(), OMException.ResultCodes.USER_MISMATCH);
-     }
-     return s3SecretManager.getS3Secret(kerberosID);
-   }
- 
-   @Override
-   /**
-    * {@inheritDoc}
-    */
-   public S3SecretValue getS3Secret(String kerberosID, boolean 
createIfNotExist)
-           throws IOException {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   @Override
-   public void createTenant(OmTenantArgs omTenantArgs) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   @Override
-   public DeleteTenantResponse deleteTenant(String tenantId) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   /**
-    * Assign user accessId to tenant.
-    */
-   public S3SecretValue tenantAssignUserAccessId(
-       String username, String tenantId, String accessId) throws IOException {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   /**
-    * Revoke user accessId to tenant.
-    */
-   public void tenantRevokeUserAccessId(String accessId) throws IOException {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   /**
-    * Assign admin role to a user by an accessId in a tenant.
-    */
-   public void tenantAssignAdmin(String accessId, String tenantId,
-                                 boolean delegated) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
-   /**
-    * Revoke admin role of an accessId from a tenant.
-    */
-   public void tenantRevokeAdmin(String accessId, String tenantId) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-         "this to be implemented. As write requests use a new approach");
-   }
- 
 +  /**
 +   * List tenants.
 +   */
 +  public TenantInfoList listTenant() throws IOException {
 +
 +    final UserGroupInformation ugi = getRemoteUser();
 +    if (!isAdmin(ugi)) {
 +      final OMException omEx = new OMException(
 +          "Only Ozone admins are allowed to list tenants.", 
PERMISSION_DENIED);
 +      AUDIT.logWriteFailure(buildAuditMessageForFailure(
 +          OMAction.LIST_TENANT, new LinkedHashMap<>(), omEx));
 +      throw omEx;
 +    }
 +
 +    final List<TenantInfo> tenantInfoList = new ArrayList<>();
 +
 +    // TODO: Iterate cache first. See KeyManagerImpl#listStatus
 +
-     TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantInfo>>
++    TableIterator<String, ? extends KeyValue<String, OmDBTenantInfo>>
 +        iterator = metadataManager.getTenantStateTable().iterator();
 +
 +    while (iterator.hasNext()) {
 +      final Table.KeyValue<String, OmDBTenantInfo> dbEntry = iterator.next();
 +      final OmDBTenantInfo omDBTenantInfo = dbEntry.getValue();
 +      assert(dbEntry.getKey().equals(omDBTenantInfo.getTenantId()));
 +      tenantInfoList.add(TenantInfo.newBuilder()
 +          .setTenantName(omDBTenantInfo.getTenantId())
 +          .setBucketNamespaceName(omDBTenantInfo.getBucketNamespaceName())
 +          .setAccountNamespaceName(omDBTenantInfo.getAccountNamespaceName())
 +          .setUserPolicyGroupName(omDBTenantInfo.getUserPolicyGroupName())
 +          .setBucketNamespaceName(omDBTenantInfo.getBucketPolicyGroupName())
 +          .build());
 +    }
 +
 +    AUDIT.logReadSuccess(buildAuditMessageForSuccess(
 +        OMAction.LIST_TENANT, new LinkedHashMap<>()));
 +
 +    return new TenantInfoList(tenantInfoList);
 +  }
 +
 +  /**
 +   * Tenant get user info.
 +   */
 +  public TenantUserInfoValue tenantGetUserInfo(String userPrincipal)
 +      throws IOException {
 +
 +    if (StringUtils.isEmpty(userPrincipal)) {
 +      return null;
 +    }
 +
 +    final List<TenantAccessIdInfo> accessIdInfoList = new ArrayList<>();
 +
 +    // Retrieve a list of accessIds associates to this user principal
 +    final OmDBKerberosPrincipalInfo kerberosPrincipalInfo =
 +        metadataManager.getPrincipalToAccessIdsTable().get(userPrincipal);
 +    if (kerberosPrincipalInfo == null) {
 +      return null;
 +    }
 +    final Set<String> accessIds = kerberosPrincipalInfo.getAccessIds();
 +
 +    final Map<String, String> auditMap = new LinkedHashMap<>();
 +    auditMap.put("userPrincipal", userPrincipal);
 +
 +    accessIds.forEach(accessId -> {
 +      try {
 +        final OmDBAccessIdInfo accessIdInfo =
 +            metadataManager.getTenantAccessIdTable().get(accessId);
 +        // Sanity check
 +        if (accessIdInfo == null) {
 +          LOG.error("Potential metadata error. Unexpected null accessIdInfo: "
 +              + "entry for accessId '{}' doesn't exist in 
TenantAccessIdTable",
 +              accessId);
 +          throw new NullPointerException("accessIdInfo is null");
 +        }
 +        assert(accessIdInfo.getUserPrincipal().equals(userPrincipal));
 +        accessIdInfoList.add(TenantAccessIdInfo.newBuilder()
 +            .setAccessId(accessId)
 +            .setTenantName(accessIdInfo.getTenantId())
 +            .setIsAdmin(accessIdInfo.getIsAdmin())
 +            .setIsDelegatedAdmin(accessIdInfo.getIsDelegatedAdmin())
 +            .build());
 +      } catch (IOException e) {
 +        LOG.error("Potential DB issue. Failed to retrieve OmDBAccessIdInfo "
 +            + "for accessId '{}' in TenantAccessIdTable.", accessId);
 +        // Audit
 +        auditMap.put("accessId", accessId);
 +        AUDIT.logWriteFailure(buildAuditMessageForFailure(
 +            OMAction.TENANT_GET_USER_INFO, auditMap, e));
 +        auditMap.remove("accessId");
 +      }
 +    });
 +
 +    AUDIT.logReadSuccess(buildAuditMessageForSuccess(
 +        OMAction.TENANT_GET_USER_INFO, auditMap));
 +
 +    return new TenantUserInfoValue(userPrincipal, accessIdInfoList);
 +  }
 +
 +  @Override
 +  public TenantUserList listUsersInTenant(String tenantId, String prefix)
 +      throws IOException {
 +
 +    if (StringUtils.isEmpty(tenantId)) {
 +      return null;
 +    }
 +
 +    final Map<String, String> auditMap = new LinkedHashMap<>();
 +    auditMap.put(OzoneConsts.TENANT, tenantId);
 +    auditMap.put(OzoneConsts.USER_PREFIX, prefix);
 +    try {
 +      String userName = getRemoteUser().getUserName();
 +      if (!multiTenantManagr.isTenantAdmin(userName, tenantId)
 +          && !omAdminUsernames.contains(userName)) {
 +        throw new IOException("Only tenant and ozone admins can access this " 
+
 +            "API. '" + userName + "' is not an admin.");
 +      }
 +
 +      final TenantUserList userList =
 +          multiTenantManagr.listUsersInTenant(tenantId, prefix);
 +      AUDIT.logReadSuccess(buildAuditMessageForSuccess(
 +          OMAction.TENANT_LIST_USER, auditMap));
 +      return userList;
 +    } catch (IOException ex) {
 +      AUDIT.logReadFailure(buildAuditMessageForFailure(
 +          OMAction.TENANT_LIST_USER, auditMap, ex));
 +      throw ex;
 +    }
 +  }
 +
 +  @Override
 +  public OmVolumeArgs getS3Volume(String accessID) throws IOException {
 +
 +    final String tenantId;
 +    try {
 +      tenantId = multiTenantManagr.getTenantForAccessID(accessID);
 +      // TODO: Get volume name from DB. Do not assume the same. e.g.
 +      //metadataManager.getTenantStateTable().get(tenantId)
 +      //    .getBucketNamespaceName();
 +      final String volumeName = tenantId;
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("Get S3 volume request for access ID {} belonging to 
tenant" +
 +                " {} is directed to the volume {}.", accessID, tenantId,
 +            volumeName);
 +      }
 +      // This call performs acl checks and checks volume existence.
 +      return getVolumeInfo(volumeName);
 +
 +    } catch (OMException ex) {
 +      if (ex.getResult().equals(INVALID_ACCESSID)) {
 +        // If the user is not associated with a tenant, they will use the
 +        // default s3 volume.
 +        String defaultS3volume =
 +            HddsClientUtils.getDefaultS3VolumeName(configuration);
 +
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("No tenant found for access ID {}. Directing " +
 +              "requests to default s3 volume {}.", accessID, defaultS3volume);
 +        }
 +        return getVolumeInfo(defaultS3volume);
 +      }
 +      throw ex;
 +    }
 +  }
 +
    @Override
-   public S3SecretValue setS3Secret(String accessId, String secretKey) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-             "this to be implemented. As write requests use a new approach");
-   }
- 
-   @Override
-   public void revokeS3Secret(String kerberosID) {
-     throw new UnsupportedOperationException("OzoneManager does not require " +
-             "this to be implemented. As write requests use a new approach");
-   }
- 
-   @Override
-   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
-       IOException {
- 
-     Preconditions.checkNotNull(keyArgs);
-     ResolvedBucket bucket = resolveBucketLink(keyArgs);
- 
-     Map<String, String> auditMap = bucket.audit(keyArgs.toAuditMap());
- 
-     keyArgs = bucket.update(keyArgs);
- 
-     metrics.incNumInitiateMultipartUploads();
-     try {
-       OmMultipartInfo result = keyManager.initiateMultipartUpload(keyArgs);
-       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-           OMAction.INITIATE_MULTIPART_UPLOAD, auditMap));
-       return result;
-     } catch (IOException ex) {
-       AUDIT.logWriteFailure(buildAuditMessageForFailure(
-           OMAction.INITIATE_MULTIPART_UPLOAD, auditMap, ex));
-       metrics.incNumInitiateMultipartUploadFails();
-       throw ex;
-     }
-   }
- 
-   @Override
-   public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
-       OmKeyArgs keyArgs, long clientID) throws IOException {
- 
-     Preconditions.checkNotNull(keyArgs);
-     ResolvedBucket bucket = resolveBucketLink(keyArgs);
- 
-     Map<String, String> auditMap = bucket.audit(keyArgs.toAuditMap());
- 
-     keyArgs = bucket.update(keyArgs);
- 
-     metrics.incNumCommitMultipartUploadParts();
-     try {
-       OmMultipartCommitUploadPartInfo result =
-           keyManager.commitMultipartUploadPart(keyArgs, clientID);
-       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-           OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, auditMap));
-       return result;
-     } catch (IOException ex) {
-       AUDIT.logWriteFailure(buildAuditMessageForFailure(
-           OMAction.INITIATE_MULTIPART_UPLOAD, auditMap, ex));
-       metrics.incNumCommitMultipartUploadPartFails();
-       throw ex;
-     }
-   }
- 
-   @Override
-   public OmMultipartUploadCompleteInfo completeMultipartUpload(
-       OmKeyArgs omKeyArgs, OmMultipartUploadCompleteList multipartUploadList)
-       throws IOException {
- 
-     Preconditions.checkNotNull(omKeyArgs);
-     ResolvedBucket bucket = resolveBucketLink(omKeyArgs);
- 
-     Map<String, String> auditMap = bucket.audit(omKeyArgs.toAuditMap());
-     auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList
-         .getMultipartMap().toString());
- 
-     omKeyArgs = bucket.update(omKeyArgs);
- 
-     metrics.incNumCompleteMultipartUploads();
-     try {
-       OmMultipartUploadCompleteInfo result = 
keyManager.completeMultipartUpload(
-               omKeyArgs, multipartUploadList);
-       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
-           .COMPLETE_MULTIPART_UPLOAD, auditMap));
-       return result;
-     } catch (IOException ex) {
-       metrics.incNumCompleteMultipartUploadFails();
-       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
-           .COMPLETE_MULTIPART_UPLOAD, auditMap, ex));
-       throw ex;
-     }
-   }
- 
-   @Override
-   public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
- 
-     Preconditions.checkNotNull(omKeyArgs);
-     ResolvedBucket bucket = resolveBucketLink(omKeyArgs);
- 
-     Map<String, String> auditMap = bucket.audit(omKeyArgs.toAuditMap());
- 
-     omKeyArgs = bucket.update(omKeyArgs);
- 
-     metrics.incNumAbortMultipartUploads();
-     try {
-       keyManager.abortMultipartUpload(omKeyArgs);
-       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
-           .COMPLETE_MULTIPART_UPLOAD, auditMap));
-     } catch (IOException ex) {
-       metrics.incNumAbortMultipartUploadFails();
-       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
-           .COMPLETE_MULTIPART_UPLOAD, auditMap, ex));
-       throw ex;
-     }
- 
-   }
- 
-   @Override
    public OmMultipartUploadListParts listParts(final String volumeName,
        final String bucketName, String keyName, String uploadID,
        int partNumberMarker, int maxParts)  throws IOException {
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 057c994,0480859..85049e1
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@@ -69,23 -54,8 +54,15 @@@ import org.apache.hadoop.ozone.om.reque
  import 
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
  import 
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
  import 
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequestWithFSO;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequestWithFSO;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequestWithFSO;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
- import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequestWithFSO;
 +import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
  import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
  import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
 +import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMAssignUserToTenantRequest;
 +import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantAssignAdminRequest;
 +import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
 +import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantDeleteRequest;
 +import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantRevokeAdminRequest;
 +import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantRevokeUserAccessIdRequest;
  import 
org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
  import 
org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
  import 
org.apache.hadoop.ozone.om.request.security.OMRenewDelegationTokenRequest;
@@@ -257,22 -162,28 +169,42 @@@ public final class OzoneManagerRatisUti
        return new OMPrepareRequest(omRequest);
      case CancelPrepare:
        return new OMCancelPrepareRequest(omRequest);
 +    case SetS3Secret:
 +      return new OMSetSecretRequest(omRequest);
      case RevokeS3Secret:
        return new S3RevokeSecretRequest(omRequest);
 +    case CreateTenant:
 +      return new OMTenantCreateRequest(omRequest);
 +    case DeleteTenant:
 +      return new OMTenantDeleteRequest(omRequest);
 +    case TenantAssignUserAccessId:
 +      return new OMAssignUserToTenantRequest(omRequest);
 +    case TenantRevokeUserAccessId:
 +      return new OMTenantRevokeUserAccessIdRequest(omRequest);
 +    case TenantAssignAdmin:
 +      return new OMTenantAssignAdminRequest(omRequest);
 +    case TenantRevokeAdmin:
 +      return new OMTenantRevokeAdminRequest(omRequest);
+ 
+     /**
+      * Following key requests will be created in {@link OMKeyRequestFactory}.
+      */
+     case CreateDirectory:
+     case CreateFile:
+     case CreateKey:
+     case AllocateBlock:
+     case CommitKey:
+     case DeleteKey:
+     case DeleteKeys:
+     case RenameKey:
+     case RenameKeys:
+     case PurgeKeys:
+     case PurgePaths:
+     case InitiateMultiPartUpload:
+     case CommitMultiPartUpload:
+     case AbortMultiPartUpload:
+     case CompleteMultiPartUpload:
+       return OMKeyRequestFactory.createRequest(omRequest, ozoneManager);
      default:
        throw new IllegalStateException("Unrecognized write command " +
            "type request" + cmdType);
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index a0e03c6,a4ef4a1..08eb966
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@@ -140,8 -130,11 +139,13 @@@ public abstract class OMClientRequest i
      OzoneManagerProtocolProtos.UserInfo.Builder userInfo =
          OzoneManagerProtocolProtos.UserInfo.newBuilder();
  
-     // Added not null checks, as in UT's these values might be null.
-     if (user != null) {
+     // If S3 Authentication is set, use AccessId as user.
+     if (omRequest.hasS3Authentication()) {
++      // TODO: For tenant users, translate accessId to (short) username
++      //  with multiTenantManager.getUserNameGivenAccessId(accessId)
+       userInfo.setUserName(omRequest.getS3Authentication().getAccessId());
+     } else if (user != null) {
+       // Added not null checks, as in UT's these values might be null.
        userInfo.setUserName(user.getUserName());
      }
  
diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 7983ae8,7946f65..412fac5
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@@ -126,29 -127,45 +127,57 @@@ public class OzoneManagerProtocolServer
  
    private OMResponse processRequest(OMRequest request) throws
        ServiceException {
-     RaftServerStatus raftServerStatus;
 +    OMClientRequest omClientRequest = null;
      if (isRatisEnabled) {
+       boolean s3Auth = false;
+       try {
+         // If Request has S3Authentication validate S3 credentials
+         // if current OM is leader and then proceed with processing the 
request.
+         if (request.hasS3Authentication()) {
+           s3Auth = true;
+           checkLeaderStatus();
+           S3SecurityUtil.validateS3Credential(request, ozoneManager);
+         }
+       } catch (IOException ex) {
+         // If validate credentials fail return error OM Response.
+         return createErrorResponse(request, ex);
+       }
        // Check if the request is a read only request
        if (OmUtils.isReadOnly(request)) {
-         return submitReadRequestToOM(request);
+         try {
+           if (request.hasS3Authentication()) {
+             ozoneManager.setS3Auth(request.getS3Authentication());
+           }
+           return submitReadRequestToOM(request);
+         } finally {
+           ozoneManager.setS3Auth(null);
+         }
        } else {
-         checkLeaderStatus();
+         // To validate credentials we have already verified leader status.
+         // This will skip of checking leader status again if request has 
S3Auth.
+         if (!s3Auth) {
+           checkLeaderStatus();
+         }
          try {
-           omClientRequest = createClientRequest(request);
 -          OMClientRequest omClientRequest =
++          omClientRequest =
+               createClientRequest(request, ozoneManager);
++          // TODO: Note: Due to HDDS-6055, createClientRequest() could now
++          //  return null, which triggered the findbugs warning.
++          //  Added the assertion.
++          assert(omClientRequest != null);
            request = omClientRequest.preExecute(ozoneManager);
          } catch (IOException ex) {
            // As some of the preExecute returns error. So handle here.
 +          if (omClientRequest != null) {
 +            omClientRequest.handleRequestFailure(ozoneManager);
 +          }
            return createErrorResponse(request, ex);
          }
 -        return submitRequestToRatis(request);
 +        OMResponse response = submitRequestToRatis(request);
-         if (!response.getSuccess() && omClientRequest != null) {
++        if (!response.getSuccess()) {
 +          omClientRequest.handleRequestFailure(ozoneManager);
 +        }
 +        return response;
        }
      } else {
        return submitRequestDirectlyToOM(request);
diff --cc 
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index 95c2825,ff8b36e..3f5c5f7
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@@ -102,42 -97,11 +97,13 @@@ public class OzoneClientProducer 
  
        String awsAccessId = signatureInfo.getAwsAccessId();
        validateAccessId(awsAccessId);
- 
-       UserGroupInformation remoteUser =
-           UserGroupInformation.createRemoteUser(awsAccessId);
-       if (OzoneSecurityUtil.isSecurityEnabled(config)) {
-         LOG.debug("Creating s3 auth info for client.");
- 
-         if (signatureInfo.getVersion() == Version.NONE) {
-           throw MALFORMED_HEADER;
-         }
- 
-         OzoneTokenIdentifier identifier = new OzoneTokenIdentifier();
-         identifier.setTokenType(S3AUTHINFO);
-         identifier.setStrToSign(stringToSign);
-         identifier.setSignature(signatureInfo.getSignature());
-         identifier.setAwsAccessId(awsAccessId);
-         identifier.setOwner(new Text(awsAccessId));
-         if (LOG.isTraceEnabled()) {
-           LOG.trace("Adding token for service:{}", omService);
-         }
-         Token<OzoneTokenIdentifier> token = new Token(identifier.getBytes(),
-             identifier.getSignature().getBytes(StandardCharsets.UTF_8),
-             identifier.getKind(),
-             omService);
-         remoteUser.addToken(token);
- 
-       }
-       ozoneClient =
-           remoteUser.doAs((PrivilegedExceptionAction<OzoneClient>) () -> {
-             // TODO: Once HDDS-4440 is merged, access ID should be passed
-             //  through the OM transport.
-             return createOzoneClient(awsAccessId);
-           });
++      // TODO: Once HDDS-4440 is merged, access ID should be passed
++      //  through the OM transport. Double check @erose
+       return new S3Auth(stringToSign,
+           signatureInfo.getSignature(),
+           awsAccessId);
      } catch (OS3Exception ex) {
-       if (LOG.isDebugEnabled()) {
-         LOG.debug("Error during Client Creation: ", ex);
-       }
+       LOG.debug("Error during Client Creation: ", ex);
        throw wrapOS3Exception(ex);
      } catch (Exception e) {
        // For any other critical errors during object creation throw Internal
@@@ -150,14 -112,19 +114,29 @@@
    }
  
    @NotNull
-   private OzoneClient createOzoneClient(String accessID) throws IOException {
+   @VisibleForTesting
+   OzoneClient createOzoneClient() throws IOException {
+     // S3 Gateway should always set the S3 Auth.
+     ozoneConfiguration.setBoolean(S3Auth.S3_AUTH_CHECK, true);
+     // Set the expected OM version if not set via config.
+     ozoneConfiguration.setIfUnset(OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY,
+         OZONE_OM_CLIENT_PROTOCOL_VERSION);
++
++    // TODO: Added this snippet for a quick fix due to a conflict with 
HDDS-5883
++    //  Double check / optimize.
++    String accessId = null;
++    try {
++      accessId = signatureProcessor.parseSignature().getAwsAccessId();
++    } catch (OS3Exception e) {
++      LOG.error("Unable to parse signature to get accessId");
++    }
++
      if (omServiceID == null) {
-       return OzoneClientFactory.getRpcClient(ozoneConfiguration, accessID);
 -      return OzoneClientFactory.getRpcClient(ozoneConfiguration);
++      return OzoneClientFactory.getRpcClient(ozoneConfiguration, accessId);
      } else {
        // As in HA case, we need to pass om service ID.
        return OzoneClientFactory.getRpcClient(omServiceID,
-           ozoneConfiguration, accessID);
 -          ozoneConfiguration);
++          ozoneConfiguration, accessId);
      }
    }
  
diff --cc 
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 4e25478,52b9c49..8068015
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@@ -50,8 -50,8 +50,9 @@@ import java.util.HashMap
  import java.util.LinkedHashMap;
  import java.util.List;
  import java.util.Map;
+ import java.util.OptionalLong;
  
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationFactor;
  import org.apache.hadoop.hdds.client.ReplicationType;
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index 5d7e0b9,96e73de..7422806
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@@ -32,9 -32,9 +32,10 @@@ import java.util.List
  import java.util.Map;
  import java.util.Scanner;
  
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationFactor;
  import org.apache.hadoop.hdds.client.ReplicationType;
+ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.ozone.OzoneConsts;
  import org.apache.hadoop.ozone.client.OzoneBucket;
  import org.apache.hadoop.ozone.client.OzoneClient;
diff --cc 
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectHead.java
index bec92af,66c7456..e0effd5
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectHead.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectHead.java
@@@ -24,9 -24,9 +24,10 @@@ import java.io.IOException
  import java.time.format.DateTimeFormatter;
  import java.util.HashMap;
  
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationFactor;
  import org.apache.hadoop.hdds.client.ReplicationType;
+ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.ozone.client.OzoneBucket;
  import org.apache.hadoop.ozone.client.OzoneClient;
  import org.apache.hadoop.ozone.client.OzoneClientStub;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to