This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 66ba5bc106536f1c51be1e4a535a0310ec7d3476
Merge: b328608 8d3817c
Author: Nandakumar <[email protected]>
AuthorDate: Tue Feb 2 10:48:51 2021 +0530

    Merge branch 'master' into HDDS-2823

 .github/workflows/post-commit.yml                  |   17 +-
 hadoop-hdds/client/pom.xml                         |    5 +
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |   18 +
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |    8 +-
 .../hadoop/hdds/scm/XceiverClientMetrics.java      |    4 +-
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |    4 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   32 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |    7 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |   77 ++
 .../client/src/test/resources/log4j.properties     |   23 +
 hadoop-hdds/common/src/main/conf/hadoop-env.cmd    |   90 --
 hadoop-hdds/common/src/main/conf/hadoop-env.sh     |  451 -------
 hadoop-hdds/common/src/main/conf/ozone-env.sh      |  279 ++++
 .../org/apache/hadoop/hdds/client/OzoneQuota.java  |   98 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   18 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   13 +-
 .../common/src/main/resources/ozone-default.xml    |   18 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   12 +-
 .../container/common/helpers/ContainerMetrics.java |    6 +-
 .../container/common/impl/HddsDispatcher.java      |    5 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |  191 ++-
 .../common/transport/server/XceiverServerGrpc.java |    8 +-
 .../common/transport/server/ratis/CSMMetrics.java  |    7 +-
 .../server/ratis/ContainerStateMachine.java        |   12 +-
 .../transport/server/ratis/XceiverServerRatis.java |   14 +-
 .../container/keyvalue/KeyValueContainer.java      |   46 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |    9 +-
 .../background/BlockDeletingService.java           |  171 ++-
 .../metadata/AbstractDatanodeDBDefinition.java     |    2 +-
 .../metadata/DatanodeSchemaOneDBDefinition.java    |    5 +
 .../metadata/DatanodeSchemaTwoDBDefinition.java    |   28 +-
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |   14 +-
 ...mpl.java => DeletedBlocksTransactionCodec.java} |   35 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   39 +-
 .../replication/GrpcReplicationClient.java         |   15 +-
 .../container/replication/ReplicationServer.java   |  149 +++
 .../replication/ReplicationSupervisor.java         |    5 +-
 .../replication/SimpleContainerDownloader.java     |    6 +-
 .../container/common/TestBlockDeletingService.java |  279 +++-
 .../container/keyvalue/TestKeyValueContainer.java  |  145 +-
 .../replication/TestSimpleContainerDownloader.java |    4 +-
 hadoop-hdds/docs/content/design/decommissioning.md |   10 +-
 hadoop-hdds/docs/content/interface/S3.md           |    6 +-
 hadoop-hdds/docs/content/interface/S3.zh.md        |   19 +
 hadoop-hdds/docs/content/tools/AuditParser.md      |    2 +-
 hadoop-hdds/docs/dev-support/bin/generate-site.sh  |    2 +-
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |   15 +
 .../SCMSecurityProtocolClientSideTranslatorPB.java |   27 +
 .../hdds/security/token/BlockTokenVerifier.java    |   44 +-
 .../certificate/authority/CertificateServer.java   |   12 +
 .../certificate/authority/CertificateStore.java    |   16 +
 .../certificate/authority/DefaultCAServer.java     |   19 +
 .../x509/certificate/client/CertificateClient.java |   12 +
 .../client/DefaultCertificateClient.java           |   30 +-
 .../server/OzoneProtocolMessageDispatcher.java     |    5 +-
 .../server/http/RatisNameRewriteSampleBuilder.java |    2 +-
 .../hadoop/hdds/utils/db/DBConfigFromFile.java     |   12 +-
 .../x509/certificate/authority/MockCAStore.java    |   11 +
 .../apache/hadoop/hdds/server/TestJsonUtils.java   |   14 +-
 .../server/http/TestRatisDropwizardExports.java    |    9 +-
 .../interface-client/src/main/proto/hdds.proto     |   10 +-
 .../src/main/proto/ScmServerSecurityProtocol.proto |   26 +
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |    3 +-
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |  130 +-
 .../hdds/scm/block/DeletedBlockLogImplV2.java      |    2 +
 .../hdds/scm/container/SCMContainerManager.java    |    6 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   17 +-
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |   14 -
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |   30 -
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   22 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   19 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |   25 +
 .../hdds/scm/safemode/ContainerSafeModeRule.java   |   19 +-
 .../hdds/scm/safemode/DataNodeSafeModeRule.java    |    5 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |    7 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   24 +-
 .../hadoop/hdds/scm/server/SCMCertStore.java       |   42 +
 .../hdds/scm/server/SCMClientProtocolServer.java   |   14 +-
 .../apache/hadoop/hdds/scm/server/SCMMXBean.java   |    6 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |   32 +-
 .../hdds/scm/server/StorageContainerManager.java   |    8 +-
 .../main/resources/webapps/scm/scm-overview.html   |   26 +-
 .../src/main/resources/webapps/scm/scm.js          |   25 +-
 .../scm/TestStorageContainerManagerHttpServer.java |    2 +
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   54 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |    2 +
 .../hdds/scm/node/states/TestNodeStateMap.java     |   65 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |    8 +-
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java |   10 -
 .../choose/algorithms/TestLeaderChoosePolicy.java  |    2 +-
 .../ozone/container/common/TestEndPoint.java       |   54 +-
 .../hdds/scm/cli/SafeModeWaitSubcommand.java       |   20 +-
 .../org/apache/hadoop/hdds/scm/cli/ScmOption.java  |   13 +
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |   61 +
 .../hadoop/hdds/scm/cli/cert/InfoSubcommand.java   |   73 +
 .../hadoop/hdds/scm/cli/cert/ListSubcommand.java   |  102 ++
 .../hdds/scm/cli/cert/ScmCertSubcommand.java       |   35 +-
 .../hadoop/hdds/scm/cli/cert/package-info.java     |   29 +-
 .../org/apache/hadoop/ozone/client/BucketArgs.java |    3 +
 .../org/apache/hadoop/ozone/client/VolumeArgs.java |    8 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   40 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   11 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |    4 +-
 .../hadoop/ozone/om/helpers/OmBucketArgs.java      |    8 +
 .../hadoop/ozone/security/acl/RequestContext.java  |   48 +-
 hadoop-ozone/dev-support/checks/acceptance.sh      |    2 +-
 hadoop-ozone/dev-support/checks/blockade.sh        |    2 +-
 hadoop-ozone/dev-support/checks/hadolint.sh        |   28 +-
 hadoop-ozone/dev-support/checks/kubernetes.sh      |    2 +-
 hadoop-ozone/dev-support/intellij/ozone-site.xml   |    4 +
 .../dist/dev-support/bin/dist-layout-stitching     |   14 +-
 .../main/compose/{ozone-ha => compatibility}/.env  |    1 -
 .../{ozone => compatibility}/docker-compose.yaml   |    5 -
 .../compose/{ozone => compatibility}/docker-config |    8 +-
 .../main/compose/{ozone => compatibility}/test.sh  |   34 +-
 hadoop-ozone/dist/src/main/compose/ozone-csi/.env  |    2 +-
 .../src/main/compose/ozone-csi/docker-compose.yaml |    8 +-
 .../dist/src/main/compose/ozone-csi/docker-config  |    3 +
 hadoop-ozone/dist/src/main/compose/ozone-ha/.env   |    2 +-
 .../dist/src/main/compose/ozone-ha/docker-config   |    4 +
 .../dist/src/main/compose/ozone-mr/common-config   |    3 +
 .../compose/ozone-mr/hadoop27/docker-compose.yaml  |    5 +-
 .../main/compose/ozone-mr/hadoop27/docker-config   |    3 +
 .../dist/src/main/compose/ozone-mr/hadoop31/.env   |    2 +-
 .../compose/ozone-mr/hadoop31/docker-compose.yaml  |   13 +-
 .../main/compose/ozone-mr/hadoop31/docker-config   |    3 +
 .../dist/src/main/compose/ozone-mr/hadoop32/.env   |    4 +-
 .../compose/ozone-mr/hadoop32/docker-compose.yaml  |   11 +-
 .../main/compose/ozone-mr/hadoop32/docker-config   |    3 +
 .../dist/src/main/compose/ozone-om-ha-s3/.env      |    2 +-
 .../compose/ozone-om-ha-s3/docker-compose.yaml     |   12 +-
 .../src/main/compose/ozone-om-ha-s3/docker-config  |    3 +
 .../src/main/compose/ozone-om-ha/docker-config     |    3 +
 .../dist/src/main/compose/ozone-topology/.env      |    2 +-
 .../compose/ozone-topology/docker-compose.yaml     |   16 +-
 .../src/main/compose/ozone-topology/docker-config  |    3 +
 hadoop-ozone/dist/src/main/compose/ozone/.env      |    2 +-
 hadoop-ozone/dist/src/main/compose/ozone/README.md |    2 +-
 .../src/main/compose/ozone/docker-compose.yaml     |   10 +-
 .../dist/src/main/compose/ozone/docker-config      |    5 +
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |    2 +
 .../src/main/compose/ozoneblockade/docker-config   |    3 +
 .../src/main/compose/ozones3-haproxy/docker-config |    3 +
 .../src/main/compose/ozonescripts/docker-config    |    3 +
 .../dist/src/main/compose/ozonescripts/start.sh    |    8 +-
 .../dist/src/main/compose/ozonescripts/stop.sh     |    2 +-
 .../main/compose/{ozone => ozonescripts}/test.sh   |   36 +-
 .../dist/src/main/compose/ozonesecure-mr/.env      |    4 +-
 .../compose/ozonesecure-mr/docker-compose.yaml     |   16 +-
 .../src/main/compose/ozonesecure-mr/docker-config  |    5 +
 .../dist/src/main/compose/ozonesecure-om-ha/.env   |    2 +-
 .../compose/ozonesecure-om-ha/docker-compose.yaml  |   20 +-
 .../main/compose/ozonesecure-om-ha/docker-config   |    3 +
 .../dist/src/main/compose/ozonesecure/.env         |    2 +-
 .../main/compose/ozonesecure/docker-compose.yaml   |   28 +-
 .../src/main/compose/ozonesecure/docker-config     |    5 +
 hadoop-ozone/dist/src/main/compose/test-all.sh     |    2 +-
 .../src/main/compose/upgrade/docker-compose.yaml   |    7 +
 .../dist/src/main/compose/upgrade/docker-config    |    3 +
 .../src/main/k8s/definitions/ozone/config.yaml     |    1 +
 .../src/main/smoketest/basic/ozone-shell-lib.robot |   10 -
 .../dist/src/main/smoketest/cli/classpath.robot    |   46 +
 .../dist/src/main/smoketest/cli/envvars.robot      |   77 ++
 .../smoketest/compatibility/dn.robot}              |   16 +-
 .../smoketest/compatibility/om.robot}              |   16 +-
 .../smoketest/compatibility/recon.robot}           |   15 +-
 .../src/main/smoketest/compatibility/scm.robot}    |   23 +-
 hadoop-ozone/dist/src/main/smoketest/lib/os.robot  |    4 +
 .../dist/src/main/smoketest/mapreduce.robot        |    2 +-
 .../dist/src/main/smoketest/ozone-lib/shell.robot  |    4 +
 .../dist/src/main/smoketest/ozonefs/ozonefs.robot  |   15 +-
 .../src/main/smoketest/security/admin-cert.robot   |   44 +
 hadoop-ozone/dist/src/shell/hdds/hadoop-config.cmd |  317 -----
 hadoop-ozone/dist/src/shell/hdds/hadoop-config.sh  |  165 ---
 hadoop-ozone/dist/src/shell/hdds/workers.sh        |   47 +-
 hadoop-ozone/dist/src/shell/ozone/ozone            |  264 ++--
 hadoop-ozone/dist/src/shell/ozone/ozone-config.sh  |  100 +-
 .../ozone-functions.sh}                            | 1397 ++++++++++----------
 hadoop-ozone/dist/src/shell/ozone/start-ozone.sh   |   84 +-
 hadoop-ozone/dist/src/shell/ozone/stop-ozone.sh    |   60 +-
 .../shell/shellprofile.d/hadoop-ozone-manager.sh   |    8 +-
 .../dist/src/shell/shellprofile.d/hadoop-ozone.sh  |    6 +-
 hadoop-ozone/dist/src/test/shell/gc_opts.bats      |   40 +-
 .../shell/ozone-functions_test_helper.bash}        |   29 +-
 .../shell/ozone_set_var_for_compatibility.bats     |   86 ++
 .../fs/ozone/TestOzoneFSWithObjectStoreCreate.java |    4 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  301 ++---
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   36 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |    2 +
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |    2 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |    1 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |    5 +
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   19 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |    2 +
 .../hadoop/ozone/TestOzoneConfigurationFields.java |    2 +
 .../hadoop/ozone/TestStorageContainerManager.java  |    3 +-
 .../ozone/TestStorageContainerManagerHelper.java   |   30 +
 .../ozone/client/CertificateClientTestImpl.java    |   11 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |    3 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |    2 +
 .../client/rpc/TestContainerStateMachine.java      |    2 +
 .../client/rpc/TestDeleteWithSlowFollower.java     |    1 +
 .../client/rpc/TestDiscardPreallocatedBlocks.java  |    1 +
 .../ozone/client/rpc/TestKeyInputStream.java       |   16 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  214 +--
 .../ozone/client/rpc/TestWatchForCommit.java       |    5 +-
 .../container/metrics/TestContainerMetrics.java    |   13 +-
 .../container/server/TestContainerServer.java      |   75 +-
 .../server/TestSecureContainerServer.java          |   57 +-
 ...gerRestart.java => TestOMEpochForNonRatis.java} |  154 +--
 .../ozone/om/TestOzoneManagerConfiguration.java    |   13 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |   47 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   |  101 --
 .../scm/node/TestDecommissionAndMaintenance.java   |   37 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |  222 +++-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   10 +
 .../apache/hadoop/ozone/om/KeyDeletingService.java |   15 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   43 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   85 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   10 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   26 +-
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |  496 +++++++
 .../apache/hadoop/ozone/om/TrashPolicyOzone.java   |   20 +-
 .../apache/hadoop/ozone/om/ha/OMHANodeDetails.java |   14 +-
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |    4 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   35 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |    9 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   26 +-
 .../request/bucket/acl/OMBucketAddAclRequest.java  |   20 +-
 .../bucket/acl/OMBucketRemoveAclRequest.java       |   20 +-
 .../request/bucket/acl/OMBucketSetAclRequest.java  |   20 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |    4 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   17 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   16 +-
 .../om/request/key/acl/OMKeyAddAclRequest.java     |   24 +-
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  |   24 +-
 .../om/request/key/acl/OMKeySetAclRequest.java     |   24 +-
 .../request/key/acl/prefix/OMPrefixAclRequest.java |    9 +-
 .../key/acl/prefix/OMPrefixAddAclRequest.java      |   13 +-
 .../key/acl/prefix/OMPrefixRemoveAclRequest.java   |   13 +-
 .../key/acl/prefix/OMPrefixSetAclRequest.java      |   13 +-
 .../om/request/volume/OMVolumeSetQuotaRequest.java |    6 +-
 .../om/request/volume/acl/OMVolumeAclRequest.java  |   20 +-
 .../request/volume/acl/OMVolumeAddAclRequest.java  |   19 +-
 .../volume/acl/OMVolumeRemoveAclRequest.java       |   19 +-
 .../request/volume/acl/OMVolumeSetAclRequest.java  |   20 +-
 .../security/OzoneBlockTokenSecretManager.java     |    8 +-
 .../OzoneDelegationTokenSecretManager.java         |   24 +-
 .../ozone/om/TestOzoneManagerHttpServer.java       |    2 +
 .../om/ratis/TestOzoneManagerRatisServer.java      |   12 +-
 .../security/TestOzoneBlockTokenSecretManager.java |  197 ++-
 .../TestOzoneDelegationTokenSecretManager.java     |   10 +-
 .../ozone/security/acl/TestRequestContext.java     |   94 ++
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |    1 +
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |    1 +
 .../recon/schema/ContainerSchemaDefinition.java    |   19 -
 .../hadoop/ozone/recon/ReconControllerModule.java  |   19 +-
 .../hadoop/ozone/recon/api/ContainerEndpoint.java  |   30 +-
 .../recon/api/types/MissingContainerMetadata.java  |    2 +-
 .../api/types/UnhealthyContainerMetadata.java      |    2 +-
 .../codec/ContainerReplicaHistoryListCodec.java    |   86 ++
 .../ozone/recon/fsck/ContainerHealthTask.java      |   14 +-
 ...ager.java => ContainerHealthSchemaManager.java} |   51 +-
 .../ozone/recon/persistence/ContainerHistory.java  |   79 ++
 .../ozone/recon/scm/ContainerReplicaHistory.java   |   62 +
 .../recon/scm/ContainerReplicaHistoryList.java}    |   39 +-
 .../ozone/recon/scm/ReconContainerManager.java     |  209 ++-
 .../scm/ReconStorageContainerManagerFacade.java    |   19 +-
 .../recon/spi/ContainerDBServiceProvider.java      |   33 +
 .../spi/impl/ContainerDBServiceProviderImpl.java   |   91 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |    4 +-
 .../ozone/recon/spi/impl/ReconDBDefinition.java    |   14 +-
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  |    7 +-
 .../ozone/recon/tasks/FileSizeCountTask.java       |    6 +-
 .../ozone/recon/tasks/OMUpdateEventBatch.java      |   21 +-
 .../hadoop/ozone/recon/tasks/ReconOmTask.java      |    9 -
 .../ozone/recon/tasks/ReconTaskControllerImpl.java |    8 +-
 .../hadoop/ozone/recon/tasks/TableCountTask.java   |    7 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |  181 ++-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |    5 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   12 +-
 .../scm/AbstractReconContainerManagerTest.java     |    6 +-
 .../ozone/recon/scm/TestReconContainerManager.java |  104 +-
 .../hadoop/ozone/recon/tasks/DummyReconDBTask.java |    1 -
 .../ozone/recon/tasks/TestFileSizeCountTask.java   |    8 +
 .../recon/tasks/TestReconTaskControllerImpl.java   |    7 -
 .../hadoop/ozone/s3/OzoneClientProducer.java       |    2 +-
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |    4 -
 .../hadoop/ozone/freon/BaseFreonGenerator.java     |    8 +
 .../ozone/freon/ClosedContainerReplicator.java     |  213 +++
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java |  147 +-
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |    3 +-
 .../hadoop/ozone/freon/RandomKeyGenerator.java     |    2 +-
 .../apache/hadoop/ozone/genesis/BenchMarkSCM.java  |    2 +-
 .../org/apache/hadoop/ozone/shell/Handler.java     |    4 +
 .../hadoop/ozone/shell/SetSpaceQuotaOptions.java   |    4 +-
 .../ozone/shell/bucket/CreateBucketHandler.java    |   13 +-
 .../hadoop/ozone/shell/bucket/SetQuotaHandler.java |   23 +-
 .../ozone/shell/volume/CreateVolumeHandler.java    |   13 +-
 .../hadoop/ozone/shell/volume/SetQuotaHandler.java |   22 +-
 pom.xml                                            |   10 +-
 301 files changed, 7390 insertions(+), 4115 deletions(-)

diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 5fd1690,3ecddac..e4fe60b
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@@ -86,7 -86,8 +87,9 @@@ public class OzoneContainer 
    private List<ContainerDataScanner> dataScanners;
    private final BlockDeletingService blockDeletingService;
    private final GrpcTlsConfig tlsClientConfig;
 +  private final AtomicBoolean isStarted;
+   private final ReplicationServer replicationServer;
+   private DatanodeDetails datanodeDetails;
  
    /**
     * Construct OzoneContainer object.
@@@ -244,12 -252,12 +256,16 @@@
     * @throws IOException
     */
    public void start(String scmId) throws IOException {
 +    if (!isStarted.compareAndSet(false, true)) {
 +      LOG.info("Ignore. OzoneContainer already started.");
 +      return;
 +    }
      LOG.info("Attempting to start container services.");
      startContainerScrub();
+ 
+     replicationServer.start();
+     datanodeDetails.setPort(Name.REPLICATION, replicationServer.getPort());
+ 
      writeChannel.start();
      readChannel.start();
      hddsDispatcher.init();
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 8acf985,657a1f7..d4190b8
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@@ -87,9 -90,9 +87,10 @@@ public class BlockManagerImpl implement
     * @throws IOException
     */
    public BlockManagerImpl(final ConfigurationSource conf,
-                           final StorageContainerManager scm) {
+                           final StorageContainerManager scm)
+       throws IOException {
      Objects.requireNonNull(scm, "SCM cannot be null");
 +    this.scm = scm;
      this.pipelineManager = scm.getPipelineManager();
      this.containerManager = scm.getContainerManager();
      this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index b61f135,b887ebc..aff0be2
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@@ -70,17 -72,27 +71,27 @@@ public class DeletedBlockLogImp
  
    public static final Logger LOG =
        LoggerFactory.getLogger(DeletedBlockLogImpl.class);
+   private static final DeletedBlocksTransaction.Builder DUMMY_TXN_BUILDER =
+       DeletedBlocksTransaction.newBuilder().setContainerID(1).setCount(1);
  
    private final int maxRetry;
 -  private final ContainerManager containerManager;
 +  private final ContainerManagerV2 containerManager;
    private final SCMMetadataStore scmMetadataStore;
    private final Lock lock;
    // Maps txId to set of DNs which are successful in committing the 
transaction
    private Map<Long, Set<UUID>> transactionToDNsCommitMap;
+   // Maps txId to its retry counts;
+   private Map<Long, Integer> transactionRetryCountMap;
+ 
+   private final AtomicLong largestTxnId;
+   // largest transactionId is stored at largestTxnIdHolderKey
+   private final long largestTxnIdHolderKey = 0L;
+ 
  
    public DeletedBlockLogImpl(ConfigurationSource conf,
 -                             ContainerManager containerManager,
 +                             ContainerManagerV2 containerManager,
-                              SCMMetadataStore scmMetadataStore) {
+                              SCMMetadataStore scmMetadataStore)
+       throws IOException {
      maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
          OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
      this.containerManager = containerManager;
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
index 179d228,b887ebc..86e32d3
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
@@@ -79,17 -81,18 +79,18 @@@ public class DeletedBlockLogImplV
    private final Lock lock;
    // Maps txId to set of DNs which are successful in committing the 
transaction
    private Map<Long, Set<UUID>> transactionToDNsCommitMap;
 -  // Maps txId to its retry counts;
 -  private Map<Long, Integer> transactionRetryCountMap;
 -
 -  private final AtomicLong largestTxnId;
 -  // largest transactionId is stored at largestTxnIdHolderKey
 -  private final long largestTxnIdHolderKey = 0L;
 +  // The access to DeletedBlocksTXTable is protected by
 +  // DeletedBlockLogStateManager.
 +  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
 +  private final SCMContext scmContext;
++  private final Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable;
  
 -
 -  public DeletedBlockLogImpl(ConfigurationSource conf,
 -                             ContainerManager containerManager,
 -                             SCMMetadataStore scmMetadataStore)
 -      throws IOException {
 +  public DeletedBlockLogImplV2(ConfigurationSource conf,
 +      ContainerManagerV2 containerManager,
 +      SCMRatisServer ratisServer,
 +      Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable,
 +      DBTransactionBuffer dbTxBuffer,
 +      SCMContext scmContext) {
      maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
          OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
      this.containerManager = containerManager;
@@@ -100,16 -104,46 +101,17 @@@
  
      // maps transaction to dns which have committed it.
      transactionToDNsCommitMap = new ConcurrentHashMap<>();
 -    transactionRetryCountMap = new ConcurrentHashMap<>();
 -    this.largestTxnId = new AtomicLong(this.getLargestRecordedTXID());
 -  }
 -
 -  public Long getNextDeleteBlockTXID() {
 -    return this.largestTxnId.incrementAndGet();
 +    this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl
 +        .newBuilder()
 +        .setConfiguration(conf)
 +        .setDeletedBlocksTable(deletedBlocksTXTable)
 +        .setRatisServer(ratisServer)
 +        .setSCMDBTransactionBuffer(dbTxBuffer)
 +        .build();
 +    this.scmContext = scmContext;
++    this.deletedBlocksTXTable = deletedBlocksTXTable;
    }
  
 -  public Long getCurrentTXID() {
 -    return this.largestTxnId.get();
 -  }
 -
 -  /**
 -   * Returns the largest recorded TXID from the DB.
 -   *
 -   * @return Long
 -   * @throws IOException
 -   */
 -  private long getLargestRecordedTXID() throws IOException {
 -    DeletedBlocksTransaction txn =
 -        scmMetadataStore.getDeletedBlocksTXTable().get(largestTxnIdHolderKey);
 -    long txnId = txn != null ? txn.getTxID() : 0L;
 -    if (txn == null) {
 -      // HDDS-4477 adds largestTxnIdHolderKey to table for storing largest
 -      // transactionId. In case the key does not exist, fetch largest
 -      // transactionId from existing transactions and update
 -      // largestTxnIdHolderKey with same.
 -      try (TableIterator<Long,
 -              ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> 
txIter =
 -              getIterator()) {
 -        txIter.seekToLast();
 -        txnId = txIter.key() != null ? txIter.key() : 0L;
 -        if (txnId > 0) {
 -          
scmMetadataStore.getDeletedBlocksTXTable().put(largestTxnIdHolderKey,
 -              DUMMY_TXN_BUILDER.setTxID(txnId).build());
 -        }
 -      }
 -    }
 -    return txnId;
 -  }
  
    @Override
    public List<DeletedBlocksTransaction> getFailedTransactions()
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 15b2fe1,0000000..0071ea8
mode 100644,000000..100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@@ -1,261 -1,0 +1,260 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the 
License.
 + * You may obtain a copy of the License at
 + * <p/>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p/>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations 
under
 + * the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.ha;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.UnknownHostException;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +
 +import org.apache.hadoop.hdds.HddsUtils;
 +import org.apache.hadoop.hdds.conf.ConfigurationSource;
 +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 +import org.apache.ratis.conf.RaftProperties;
 +import org.apache.ratis.protocol.ClientId;
 +import org.apache.ratis.protocol.RaftClientReply;
 +import org.apache.ratis.protocol.RaftClientRequest;
 +import org.apache.ratis.protocol.RaftGroup;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.protocol.RaftPeer;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.protocol.exceptions.NotLeaderException;
 +import org.apache.ratis.server.RaftServer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * TODO.
 + */
 +public class SCMRatisServerImpl implements SCMRatisServer {
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(SCMRatisServerImpl.class);
 +
 +  private final RaftServer.Division division;
 +  private final StorageContainerManager scm;
 +  private final InetSocketAddress address;
 +  private final ClientId clientId = ClientId.randomId();
 +  private final AtomicLong callId = new AtomicLong();
 +
 +  // TODO: Refactor and remove ConfigurationSource and use only
 +  //  SCMHAConfiguration.
 +  SCMRatisServerImpl(final SCMHAConfiguration haConf,
 +      final ConfigurationSource conf, final StorageContainerManager scm,
 +      final DBTransactionBuffer buffer) throws IOException {
 +    this.scm = scm;
 +    this.address = haConf.getRatisBindAddress();
 +
 +    SCMHAGroupBuilder haGrpBuilder = new SCMHAGroupBuilder(haConf, conf);
 +
 +    final RaftProperties serverProperties = RatisUtil
 +        .newRaftProperties(haConf, conf);
 +
 +    RaftServer server = RaftServer.newBuilder()
 +        .setServerId(haGrpBuilder.getPeerId())
 +        .setGroup(haGrpBuilder.getRaftGroup())
 +        .setProperties(serverProperties)
 +        .setStateMachine(new SCMStateMachine(scm, this, buffer))
 +        .build();
 +
 +    this.division = server.getDivision(haGrpBuilder.getRaftGroupId());
 +  }
 +
 +  @Override
 +  public void start() throws IOException {
 +    division.getRaftServer().start();
 +  }
 +
 +  @Override
 +  public void registerStateMachineHandler(final RequestType handlerType,
 +                                          final Object handler) {
 +    ((SCMStateMachine) division.getStateMachine())
 +        .registerHandler(handlerType, handler);
 +  }
 +
 +  @Override
 +  public SCMRatisResponse submitRequest(SCMRatisRequest request)
 +      throws IOException, ExecutionException, InterruptedException {
-     final RaftClientRequest raftClientRequest =
-         new RaftClientRequest(
-             clientId,
-             division.getId(),
-             division.getGroup().getGroupId(),
-             nextCallId(),
-             request.encode(),
-             RaftClientRequest.writeRequestType(),
-             null);
++    final RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
++        .setClientId(clientId)
++        .setServerId(division.getId())
++        .setGroupId(division.getGroup().getGroupId())
++        .setCallId(nextCallId())
++        .setMessage(request.encode())
++        .setType(RaftClientRequest.writeRequestType())
++        .build();
 +    final RaftClientReply raftClientReply =
 +        division.getRaftServer()
 +            .submitClientRequestAsync(raftClientRequest)
 +            .get();
 +    return SCMRatisResponse.decode(raftClientReply);
 +  }
 +
 +  private long nextCallId() {
 +    return callId.getAndIncrement() & Long.MAX_VALUE;
 +  }
 +
 +  @Override
 +  public void stop() throws IOException {
 +    division.getRaftServer().close();
 +  }
 +
 +  @Override
 +  public RaftServer.Division getDivision() {
 +    return division;
 +  }
 +
 +  @Override
 +  public List<String> getRatisRoles() {
 +    return division.getGroup().getPeers().stream()
 +        .map(peer -> peer.getAddress() == null ? "" : peer.getAddress())
 +        .collect(Collectors.toList());
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public NotLeaderException triggerNotLeaderException() {
 +    return new NotLeaderException(
 +        division.getMemberId(), null, division.getGroup().getPeers());
 +  }
 +
 +  /**
 +   * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
 +   * its raft peers should locate on different nodes, and use the same port
 +   * to communicate with each other.
 +   *
 +   * Each of the raft peer figures out its {@link RaftPeerId} by computing
 +   * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}.
 +   *
 +   * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2",
 +   * scm with ip0 identifies its {@link RaftPeerId} as scm0,
 +   * scm with ip1 identifies its {@link RaftPeerId} as scm1,
 +   * scm with ip2 identifies its {@link RaftPeerId} as scm2.
 +   *
 +   * After startup, they will form a {@link RaftGroup} with groupID
 +   * "SCM-HA-Service", and communicate with each other via
 +   * ozone.scm.ha.ratis.bind.port.
 +   */
 +  private static class SCMHAGroupBuilder {
 +    private final static String SCM_SERVICE_ID = "SCM-HA-Service";
 +
 +    private final RaftGroupId raftGroupId;
 +    private final RaftGroup raftGroup;
 +    private RaftPeerId selfPeerId;
 +
 +    /**
 +     * @return raft group
 +     */
 +    public RaftGroup getRaftGroup() {
 +      return raftGroup;
 +    }
 +
 +    /**
 +     * @return raft group id
 +     */
 +    public RaftGroupId getRaftGroupId() {
 +      return raftGroupId;
 +    }
 +
 +    /**
 +     * @return raft peer id
 +     */
 +    public RaftPeerId getPeerId() {
 +      return selfPeerId;
 +    }
 +
 +    SCMHAGroupBuilder(final SCMHAConfiguration haConf,
 +                      final ConfigurationSource conf) throws IOException {
 +      // fetch port
 +      int port = haConf.getRatisBindAddress().getPort();
 +
 +      // fetch localhost
 +      InetAddress localHost = InetAddress.getLocalHost();
 +
 +      // fetch hosts from ozone.scm.names
 +      List<String> hosts = parseHosts(conf);
 +
 +      final List<RaftPeer> raftPeers = new ArrayList<>();
 +      for (int i = 0; i < hosts.size(); ++i) {
 +        String nodeId = "scm" + i;
 +        RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId);
 +
 +        String host = hosts.get(i);
 +        if (InetAddress.getByName(host).equals(localHost)) {
 +          selfPeerId = peerId;
 +        }
 +
 +        raftPeers.add(RaftPeer.newBuilder()
 +            .setId(peerId)
 +            .setAddress(host + ":" + port)
 +            .build());
 +      }
 +
 +      if (selfPeerId == null) {
 +        String errorMessage = "localhost " +  localHost
 +            + " does not exist in ozone.scm.names "
 +            + conf.get(ScmConfigKeys.OZONE_SCM_NAMES);
 +        throw new IOException(errorMessage);
 +      }
 +
 +      LOG.info("Build a RaftGroup for SCMHA, " +
 +              "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}",
 +          localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId);
 +
 +      raftGroupId = RaftGroupId.valueOf(UUID.nameUUIDFromBytes(
 +          SCM_SERVICE_ID.getBytes(StandardCharsets.UTF_8)));
 +
 +      raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
 +    }
 +
 +    private List<String> parseHosts(final ConfigurationSource conf)
 +        throws UnknownHostException {
 +      // fetch hosts from ozone.scm.names
 +      List<String> hosts =
 +          Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
 +              .map(scmName -> HddsUtils.getHostName(scmName).get())
 +              .collect(Collectors.toList());
 +
 +      // if this is not a conf for a multi-server raft cluster,
 +      // it means we are in integration test, and need to augment
 +      // the conf to help build a single-server raft cluster.
 +      if (hosts.size() == 0) {
 +        // ozone.scm.names is not set
 +        hosts.add(InetAddress.getLocalHost().getHostName());
 +      } else if (hosts.size() == 1) {
 +        // ozone.scm.names is set, yet the conf may not be usable.
 +        hosts.set(0, InetAddress.getLocalHost().getHostName());
 +      }
 +
 +      LOG.info("fetch hosts {} from ozone.scm.names {}.",
 +          hosts, conf.get(ScmConfigKeys.OZONE_SCM_NAMES));
 +      return hosts;
 +    }
 +  }
 +}
diff --cc 
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index ef6d102,d490e95..a9c13a2
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@@ -262,8 -239,57 +262,49 @@@ public class TestDeletedBlockLog 
    }
  
    @Test
+   public void testIncrementCountLessFrequentWritingToDB() throws Exception {
+     OzoneConfiguration testConf = OzoneConfiguration.of(conf);
+     testConf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 120);
+ 
 -    deletedBlockLog = new DeletedBlockLogImpl(testConf, containerManager,
 -        scm.getScmMetadataStore());
 -
 -    for (Map.Entry<Long, List<Long>> entry :
 -        generateData(1).entrySet()) {
 -      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
 -    }
++    deletedBlockLog.addTransactions(generateData(1));
+ 
+     List<DeletedBlocksTransaction> blocks =
+         getTransactions(40 * BLOCKS_PER_TXN);
+     List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
+         .collect(Collectors.toList());
+ 
+     for (int i = 0; i < 50; i++) {
+       deletedBlockLog.incrementCount(txIDs);
+     }
+     blocks = getTransactions(40 * BLOCKS_PER_TXN);
+     for (DeletedBlocksTransaction block : blocks) {
+       // block count should not be updated as there are only 50 retries.
+       Assert.assertEquals(0, block.getCount());
+     }
+ 
+     for (int i = 0; i < 60; i++) {
+       deletedBlockLog.incrementCount(txIDs);
+     }
+     blocks = getTransactions(40 * BLOCKS_PER_TXN);
+     for (DeletedBlocksTransaction block : blocks) {
+       // block count should be updated to 100 as there are already 110 
retries.
+       Assert.assertEquals(100, block.getCount());
+     }
+ 
+     for (int i = 0; i < 50; i++) {
+       deletedBlockLog.incrementCount(txIDs);
+     }
+     blocks = getTransactions(40 * BLOCKS_PER_TXN);
+     for (DeletedBlocksTransaction block : blocks) {
+       // block count should be updated to -1 as retry count exceeds maxRetry
+       // (i.e. 160 > maxRetry which is 120).
+       Assert.assertEquals(-1, block.getCount());
+     }
+   }
+ 
+   @Test
    public void testCommitTransactions() throws Exception {
 -    for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
 -      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
 -    }
 +    addTransactions(generateData(50));
      List<DeletedBlocksTransaction> blocks =
          getTransactions(20 * BLOCKS_PER_TXN);
      // Add an invalid txn.
@@@ -346,6 -375,15 +387,17 @@@
      blocks = getTransactions(BLOCKS_PER_TXN * 40);
      Assert.assertEquals(40, blocks.size());
      commitTransactions(blocks);
+ 
+     // close db and reopen it again to make sure
+     // currentTxnID = 50
+     deletedBlockLog.close();
 -    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
 -        scm.getScmMetadataStore());
++    new DeletedBlockLogImplV2(conf, containerManager,
++        MockSCMHAManager.getInstance(true).getRatisServer(),
++        scm.getScmMetadataStore().getDeletedBlocksTXTable(),
++        dbTransactionBuffer, SCMContext.emptyContext());
+     blocks = getTransactions(BLOCKS_PER_TXN * 40);
+     Assert.assertEquals(0, blocks.size());
 -    Assert.assertEquals((long)deletedBlockLog.getCurrentTXID(), 50L);
++    //Assert.assertEquals((long)deletedBlockLog.getCurrentTXID(), 50L);
    }
  
    @Test
diff --cc 
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 23b7950,d7c7d2e..bd2959d
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@@ -42,9 -42,11 +42,10 @@@ import org.apache.hadoop.hdds.protocol.
      .StorageContainerDatanodeProtocolProtos.NodeReportProto;
  import org.apache.hadoop.hdds.protocol.proto
      .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 -import org.apache.hadoop.hdds.scm.HddsTestUtils;
+ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
  import org.apache.hadoop.hdds.scm.TestUtils;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
 -import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
  import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
  import org.apache.hadoop.hdds.scm.container.ContainerReplica;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
diff --cc 
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
index 25601f7,5954f08..b2b3a65
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@@ -119,8 -122,53 +122,53 @@@ public class TestNodeStateMap 
          map.getNodeCount(NodeOperationalState.DECOMMISSIONING, null));
    }
  
-   private void addNodeWithState(DatanodeDetails dn,
-       NodeOperationalState opState, NodeState health)
+   /**
+    * Test if container list is iterable even if it's modified from other 
thread.
+    */
+   @Test
+   public void testConcurrency() throws Exception {
+     NodeStateMap nodeStateMap = new NodeStateMap();
+ 
+     final DatanodeDetails datanodeDetails =
+         MockDatanodeDetails.randomDatanodeDetails();
+ 
+     nodeStateMap.addNode(datanodeDetails, NodeStatus.inServiceHealthy());
+ 
+     UUID dnUuid = datanodeDetails.getUuid();
+ 
 -    nodeStateMap.addContainer(dnUuid, new ContainerID(1L));
 -    nodeStateMap.addContainer(dnUuid, new ContainerID(2L));
 -    nodeStateMap.addContainer(dnUuid, new ContainerID(3L));
++    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(1L));
++    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(2L));
++    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(3L));
+ 
+     CountDownLatch elementRemoved = new CountDownLatch(1);
+     CountDownLatch loopStarted = new CountDownLatch(1);
+ 
+     new Thread(() -> {
+       try {
+         loopStarted.await();
 -        nodeStateMap.removeContainer(dnUuid, new ContainerID(1L));
++        nodeStateMap.removeContainer(dnUuid, ContainerID.valueOf(1L));
+         elementRemoved.countDown();
+       } catch (Exception e) {
+         e.printStackTrace();
+       }
+ 
+     }).start();
+ 
+     boolean first = true;
+     for (ContainerID key : nodeStateMap.getContainers(dnUuid)) {
+       if (first) {
+         loopStarted.countDown();
+         elementRemoved.await();
+       }
+       first = false;
+       System.out.println(key);
+     }
+   }
+ 
+   private void addNodeWithState(
+       DatanodeDetails dn,
+       NodeOperationalState opState, NodeState health
+   )
        throws NodeAlreadyExistsException {
      NodeStatus status = new NodeStatus(opState, health);
      map.addNode(dn, status);
diff --cc 
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
index 13b7d99,8c4df07..60afe2d
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
@@@ -52,11 -51,10 +52,11 @@@ public class TestLeaderChoosePolicy 
          mock(NodeManager.class),
          mock(PipelineStateManager.class),
          conf,
 -        mock(EventPublisher.class));
 +        mock(EventPublisher.class),
 +        SCMContext.emptyContext());
      Assert.assertSame(
          ratisPipelineProvider.getLeaderChoosePolicy().getClass(),
-         DefaultLeaderChoosePolicy.class);
+         MinLeaderCountChoosePolicy.class);
    }
  
    @Test(expected = RuntimeException.class)
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 49e449e,3e8628f..923a896
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@@ -19,8 -19,9 +19,9 @@@
  package org.apache.hadoop.hdds.scm.pipeline;
  
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 -import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
  import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
  import org.apache.hadoop.ozone.MiniOzoneCluster;
  import org.junit.AfterClass;
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 3eb66b9,edb6f2b..159801a
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -54,8 -54,8 +54,10 @@@ public class TestOzoneConfigurationFiel
      errorIfMissingXmlProps = true;
      xmlPropsToSkipCompare.add("hadoop.tags.custom");
      xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
 +    xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
 +    xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms");
+     xmlPropsToSkipCompare.add("ozone.om.leader.election.minimum.timeout" +
+         ".duration"); // Deprecated config
      addPropertiesNotInXml();
    }
  
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 75b6c8c,c42f5a8..bcf9e47
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@@ -22,9 -22,10 +22,10 @@@ import org.apache.hadoop.hdds.client.Re
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
  import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 -import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
  import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
  import org.apache.hadoop.hdds.scm.container.ContainerReplica;
  import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
diff --cc 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index b0acd1c,5cd6ec8..9ed0d44
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@@ -84,15 -84,15 +84,16 @@@ public class ContainerEndpoint 
    @Inject
    private ReconOMMetadataManager omMetadataManager;
  
 -  private final ReconContainerManager containerManager;
 +  // TODO: Fix ME, This has to be ReconContainerManager
-   private ContainerManagerV2 containerManager;
-   private ContainerSchemaManager containerSchemaManager;
++  private ReconContainerManager containerManager;
+   private final ContainerHealthSchemaManager containerHealthSchemaManager;
  
    @Inject
    public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
-                            ContainerSchemaManager containerSchemaManager) {
-     this.containerManager = reconSCM.getContainerManager();
-     this.containerSchemaManager = containerSchemaManager;
+       ContainerHealthSchemaManager containerHealthSchemaManager) {
 -    this.containerManager =
 -        (ReconContainerManager) reconSCM.getContainerManager();
++    this.containerManager = (ReconContainerManager) reconSCM
++        .getContainerManager();
+     this.containerHealthSchemaManager = containerHealthSchemaManager;
    }
  
    /**
diff --cc 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 9ce3fa8,9ceb5dd..40b87bb
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@@ -25,10 -25,10 +25,10 @@@ import java.util.Set
  import org.apache.hadoop.hdds.scm.PlacementPolicy;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 -import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
  import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
  import org.apache.hadoop.hdds.scm.container.ContainerReplica;
- import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
  import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
  import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
  import org.apache.hadoop.util.Time;
@@@ -49,16 -49,16 +49,16 @@@ public class ContainerHealthTask extend
    private static final Logger LOG =
        LoggerFactory.getLogger(ContainerHealthTask.class);
  
 -  private ContainerManager containerManager;
 +  private ContainerManagerV2 containerManager;
-   private ContainerSchemaManager containerSchemaManager;
+   private ContainerHealthSchemaManager containerHealthSchemaManager;
    private PlacementPolicy placementPolicy;
    private final long interval;
    private Set<ContainerInfo> processedContainers = new HashSet<>();
  
    public ContainerHealthTask(
 -      ContainerManager containerManager,
 +      ContainerManagerV2 containerManager,
        ReconTaskStatusDao reconTaskStatusDao,
-       ContainerSchemaManager containerSchemaManager,
+       ContainerHealthSchemaManager containerHealthSchemaManager,
        PlacementPolicy placementPolicy,
        ReconTaskConfig reconTaskConfig) {
      super(reconTaskStatusDao);
diff --cc 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 73e260f,f80d6ad..745d4fb
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@@ -21,25 -22,33 +22,36 @@@ import static java.util.Comparator.comp
  import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE;
  
  import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.stream.Collectors;
  
 +import org.apache.hadoop.conf.Configuration;
+ import com.google.common.annotations.VisibleForTesting;
 -import org.apache.hadoop.hdds.conf.ConfigurationSource;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
  import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
  import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
  import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+ import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
 -import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
  import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 +import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
 +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
  import org.apache.hadoop.hdds.utils.db.DBStore;
  import org.apache.hadoop.hdds.utils.db.Table;
 +import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
- import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHistory;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+ import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
  import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
  
  import org.slf4j.Logger;
@@@ -52,9 -61,12 +64,13 @@@ public class ReconContainerManager exte
  
    private static final Logger LOG =
        LoggerFactory.getLogger(ReconContainerManager.class);
 -  private final StorageContainerServiceProvider scmClient;
 +  private StorageContainerServiceProvider scmClient;
-   private ContainerSchemaManager containerSchemaManager;
 +  private PipelineManager pipelineManager;
+   private final ContainerHealthSchemaManager containerHealthSchemaManager;
+   private final ContainerDBServiceProvider cdbServiceProvider;
+   private final Table<UUID, DatanodeDetails> nodeDB;
+   // Container ID -> Datanode UUID -> Timestamp
+   private final Map<Long, Map<UUID, ContainerReplicaHistory>> 
replicaHistoryMap;
  
    /**
     * Constructs a mapping class that creates mapping between container names
@@@ -67,17 -79,21 +83,23 @@@
     * @throws IOException on Failure.
     */
    public ReconContainerManager(
 -      ConfigurationSource conf,
 +      Configuration conf,
 +      DBStore store,
        Table<ContainerID, ContainerInfo> containerStore,
 -      DBStore batchHandler,
        PipelineManager pipelineManager,
        StorageContainerServiceProvider scm,
-       ContainerSchemaManager containerSchemaManager) throws IOException {
+       ContainerHealthSchemaManager containerHealthSchemaManager,
+       ContainerDBServiceProvider containerDBServiceProvider)
+       throws IOException {
 -    super(conf, containerStore, batchHandler, pipelineManager);
 +    super(conf, MockSCMHAManager.getInstance(true,
 +        new MockDBTransactionBuffer(store)), pipelineManager, containerStore);
      this.scmClient = scm;
 +    this.pipelineManager = pipelineManager;
-     this.containerSchemaManager = containerSchemaManager;
+     this.containerHealthSchemaManager = containerHealthSchemaManager;
+     this.cdbServiceProvider = containerDBServiceProvider;
+     // batchHandler = scmDBStore
 -    this.nodeDB = ReconSCMDBDefinition.NODES.getTable(batchHandler);
++    this.nodeDB = ReconSCMDBDefinition.NODES.getTable(store);
+     this.replicaHistoryMap = new ConcurrentHashMap<>();
    }
  
    /**
diff --cc 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 80648cf,04c02ec..d23440d
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@@ -123,11 -119,9 +124,10 @@@ public class ReconStorageContainerManag
              ReconSCMDBDefinition.PIPELINES.getTable(dbStore),
              eventQueue);
      this.containerManager = new ReconContainerManager(conf,
 +        dbStore,
          ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
-         pipelineManager,
-         scmServiceProvider,
-         containerSchemaManager);
 -        dbStore, pipelineManager, scmServiceProvider,
++        pipelineManager, scmServiceProvider,
+         containerHealthSchemaManager, containerDBServiceProvider);
      this.scmServiceProvider = scmServiceProvider;
  
      NodeReportHandler nodeReportHandler =
diff --cc 
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
index 5ff087e,49aa306..edfe589
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
@@@ -48,7 -52,6 +52,8 @@@ import org.apache.hadoop.hdds.protocol.
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
++import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
  import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
  import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
@@@ -95,9 -100,9 +103,10 @@@ public class TestContainerEndpoint 
    private ContainerDBServiceProvider containerDbServiceProvider;
    private ContainerEndpoint containerEndpoint;
    private boolean isSetupDone = false;
-   private ContainerSchemaManager containerSchemaManager;
+   private ContainerHealthSchemaManager containerHealthSchemaManager;
    private ReconOMMetadataManager reconOMMetadataManager;
 -  private ContainerID containerID = new ContainerID(1L);
 +  private ContainerID containerID = ContainerID.valueOf(1L);
++  private Pipeline pipeline;
    private PipelineID pipelineID;
    private long keyCount = 5L;
  
@@@ -106,26 -116,9 +120,26 @@@
          initializeNewOmMetadataManager(temporaryFolder.newFolder()),
          temporaryFolder.newFolder());
  
--    Pipeline pipeline = getRandomPipeline();
++    pipeline = getRandomPipeline();
      pipelineID = pipeline.getId();
  
 +    // Mock ReconStorageContainerManagerFacade and other SCM related methods
 +    OzoneStorageContainerManager mockReconSCM =
 +        mock(ReconStorageContainerManagerFacade.class);
 +    ContainerManagerV2 mockContainerManager =
 +        mock(ContainerManagerV2.class);
 +
 +    when(mockContainerManager.getContainer(Mockito.any(ContainerID.class)))
 +        .thenReturn(
 +        new ContainerInfo.Builder()
 +            .setContainerID(containerID.getId())
 +            .setNumberOfKeys(keyCount)
 +            .setReplicationFactor(ReplicationFactor.THREE)
 +            .setPipelineID(pipelineID)
 +            .build());
 +    when(mockReconSCM.getContainerManager())
 +        .thenReturn(mockContainerManager);
 +
      ReconTestInjector reconTestInjector =
          new ReconTestInjector.Builder(temporaryFolder)
              .withReconSqlDb()
@@@ -154,7 -154,7 +175,7 @@@
        isSetupDone = true;
      }
      //Write Data to OM
--    Pipeline pipeline = getRandomPipeline();
++    pipeline = getRandomPipeline();
  
      List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
      BlockID blockID1 = new BlockID(1, 101);
@@@ -453,8 -457,29 +478,28 @@@
      });
    }
  
+   ContainerInfo newContainerInfo(long containerId) {
+     return new ContainerInfo.Builder()
+         .setContainerID(containerId)
+         .setReplicationType(HddsProtos.ReplicationType.RATIS)
+         .setState(HddsProtos.LifeCycleState.OPEN)
+         .setOwner("owner1")
+         .setNumberOfKeys(keyCount)
+         .setReplicationFactor(ReplicationFactor.THREE)
+         .setPipelineID(pipelineID)
+         .build();
+   }
+ 
+   void putContainerInfos(int num) throws IOException {
+     for (int i = 1; i <= num; i++) {
+       final ContainerInfo info = newContainerInfo(i);
 -      reconContainerManager.getContainerStore().put(new ContainerID(i), info);
 -      reconContainerManager.getContainerStateManager().addContainerInfo(
 -          i, info, null, null);
++      reconContainerManager.addNewContainer(i,
++          new ContainerWithPipeline(info, pipeline));
+     }
+   }
+ 
    @Test
-   public void testUnhealthyContainers() {
+   public void testUnhealthyContainers() throws IOException {
      Response response = containerEndpoint.getUnhealthyContainers(1000, 1);
  
      UnhealthyContainersResponse responseObject =
diff --cc 
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index 8d9040d,469ddf9..a109c99
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@@ -41,10 -40,10 +41,10 @@@ import org.apache.hadoop.hdds.scm.Conta
  import org.apache.hadoop.hdds.scm.PlacementPolicy;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 -import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
  import org.apache.hadoop.hdds.scm.container.ContainerReplica;
  import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
- import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
  import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
  import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
  import org.apache.hadoop.test.LambdaTestUtils;
diff --cc 
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 93cf256,377c356..8e71d40
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@@ -83,11 -83,12 +84,12 @@@ public class AbstractReconContainerMana
          ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue);
      containerManager = new ReconContainerManager(
          conf,
 -        ReconSCMDBDefinition.CONTAINERS.getTable(store),
          store,
 +        ReconSCMDBDefinition.CONTAINERS.getTable(store),
          pipelineManager,
          getScmServiceProvider(),
-         mock(ContainerSchemaManager.class));
+         mock(ContainerHealthSchemaManager.class),
+         mock(ContainerDBServiceProvider.class));
    }
  
    @After
diff --cc 
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
index fec673a,1fe32d1..3b09989
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
@@@ -22,6 -22,6 +22,7 @@@ import static org.apache.hadoop.hdds.pr
  import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
  import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
  import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
++import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
@@@ -35,8 -38,10 +39,11 @@@ import org.apache.hadoop.hdds.protocol.
  import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
  import org.apache.hadoop.hdds.scm.container.ContainerID;
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
  import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 -import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
++import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+ import org.junit.Assert;
  import org.junit.Test;
  
  /**
@@@ -45,6 -50,6 +52,8 @@@
  public class TestReconContainerManager
      extends AbstractReconContainerManagerTest {
  
++  private Pipeline pipeline =  getRandomPipeline();
++
    @Test
    public void testAddNewOpenContainer() throws IOException {
      ContainerWithPipeline containerWithPipeline =
@@@ -144,4 -145,98 +153,97 @@@
      assertEquals(CLOSING,
          getContainerManager().getContainer(containerID).getState());
    }
- }
+ 
+   ContainerInfo newContainerInfo(long containerId) {
+     return new ContainerInfo.Builder()
+         .setContainerID(containerId)
+         .setReplicationType(HddsProtos.ReplicationType.RATIS)
+         .setState(HddsProtos.LifeCycleState.OPEN)
+         .setOwner("owner2")
+         .setNumberOfKeys(99L)
+         .setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
 -        .setPipelineID(PipelineID.randomId())
++        .setPipelineID(pipeline.getId())
+         .build();
+   }
+ 
+   void putContainerInfos(ReconContainerManager containerManager, int num)
+       throws IOException {
+     for (int i = 1; i <= num; i++) {
+       final ContainerInfo info = newContainerInfo(i);
 -      containerManager.getContainerStore().put(new ContainerID(i), info);
 -      containerManager.getContainerStateManager()
 -          .addContainerInfo(i, info, null, null);
++      containerManager.addNewContainer(i,
++          new ContainerWithPipeline(info, pipeline));
+     }
+   }
+ 
+   @Test
+   public void testUpdateAndRemoveContainerReplica() throws IOException {
+     // Sanity checking updateContainerReplica and ContainerReplicaHistory
+ 
+     // Init Container 1
+     final long cIDlong1 = 1L;
 -    final ContainerID containerID1 = new ContainerID(cIDlong1);
++    final ContainerID containerID1 = ContainerID.valueOf(cIDlong1);
+ 
+     // Init DN01
+     final UUID uuid1 = UUID.randomUUID();
+     final DatanodeDetails datanodeDetails1 = DatanodeDetails.newBuilder()
+         
.setUuid(uuid1).setHostName("host1").setIpAddress("127.0.0.1").build();
+     final ContainerReplica containerReplica1 = ContainerReplica.newBuilder()
+         .setContainerID(containerID1).setContainerState(State.OPEN)
+         .setDatanodeDetails(datanodeDetails1).build();
+ 
+     final ReconContainerManager containerManager = getContainerManager();
+     final Map<Long, Map<UUID, ContainerReplicaHistory>> repHistMap =
+         containerManager.getReplicaHistoryMap();
+     // Should be empty at the beginning
+     Assert.assertEquals(0, repHistMap.size());
+ 
+     // Put a replica info and call updateContainerReplica
+     putContainerInfos(containerManager, 10);
+     containerManager.updateContainerReplica(containerID1, containerReplica1);
+     // Should have 1 container entry in the replica history map
+     Assert.assertEquals(1, repHistMap.size());
+     // Should only have 1 entry for this replica (on DN01)
+     Assert.assertEquals(1, repHistMap.get(cIDlong1).size());
+     ContainerReplicaHistory repHist1 = repHistMap.get(cIDlong1).get(uuid1);
+     Assert.assertEquals(uuid1, repHist1.getUuid());
+     // Because this is a new entry, first seen time equals last seen time
+     assertEquals(repHist1.getLastSeenTime(), repHist1.getFirstSeenTime());
+ 
+     // Let's update the entry again
+     containerManager.updateContainerReplica(containerID1, containerReplica1);
+     // Should still have 1 entry in the replica history map
+     Assert.assertEquals(1, repHistMap.size());
+     // Now last seen time should be larger than first seen time
+     Assert.assertTrue(repHist1.getLastSeenTime() > 
repHist1.getFirstSeenTime());
+ 
+     // Init DN02
+     final UUID uuid2 = UUID.randomUUID();
+     final DatanodeDetails datanodeDetails2 = DatanodeDetails.newBuilder()
+         
.setUuid(uuid2).setHostName("host2").setIpAddress("127.0.0.2").build();
+     final ContainerReplica containerReplica2 = ContainerReplica.newBuilder()
+         .setContainerID(containerID1).setContainerState(State.OPEN)
+         .setDatanodeDetails(datanodeDetails2).build();
+ 
+     // Add replica to DN02
+     containerManager.updateContainerReplica(containerID1, containerReplica2);
+ 
+     // Should still have 1 container entry in the replica history map
+     Assert.assertEquals(1, repHistMap.size());
+     // Should have 2 entries for this replica (on DN01 and DN02)
+     Assert.assertEquals(2, repHistMap.get(cIDlong1).size());
+     ContainerReplicaHistory repHist2 = repHistMap.get(cIDlong1).get(uuid2);
+     Assert.assertEquals(uuid2, repHist2.getUuid());
+     // Because this is a new entry, first seen time equals last seen time
+     assertEquals(repHist2.getLastSeenTime(), repHist2.getFirstSeenTime());
+ 
+     // Remove replica from DN01
+     containerManager.removeContainerReplica(containerID1, containerReplica1);
+     // Should still have 1 container entry in the replica history map
+     Assert.assertEquals(1, repHistMap.size());
+     // Should have 1 entry for this replica
+     Assert.assertEquals(1, repHistMap.get(cIDlong1).size());
+     // And the only entry should match DN02
+     Assert.assertEquals(uuid2,
+         repHistMap.get(cIDlong1).keySet().iterator().next());
+   }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to