This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 3b6918e  HDDS-4295. Add SCMServiceManager to SCM HA. (#1784)
3b6918e is described below

commit 3b6918e91c55dfa35dff36d155d04f7bb796b3c2
Author: GlenGeng <[email protected]>
AuthorDate: Thu Jan 28 10:14:26 2021 +0800

    HDDS-4295. Add SCMServiceManager to SCM HA. (#1784)
---
 .../hadoop/ozone/protocol/commands/SCMCommand.java |   8 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   8 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   2 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |  54 +++-
 .../container/AbstractContainerReportHandler.java  |   2 +-
 .../scm/container/CloseContainerEventHandler.java  |   2 +-
 .../hdds/scm/container/ReplicationManager.java     | 101 +++++--
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   3 -
 .../org/apache/hadoop/hdds/scm/ha/SCMContext.java  | 114 ++++++--
 .../org/apache/hadoop/hdds/scm/ha/SCMService.java  |  65 +++++
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |  67 +++++
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |   8 +-
 .../hadoop/hdds/scm/node/NewNodeHandler.java       |  16 +-
 .../scm/node/NonHealthyToHealthyNodeHandler.java   |  18 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   2 +-
 .../scm/pipeline/BackgroundPipelineCreatorV2.java  | 303 +++++++++++++++++++++
 .../hdds/scm/pipeline/PipelineActionHandler.java   |   2 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   7 +-
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |  98 ++-----
 .../hdds/scm/pipeline/PipelineReportHandler.java   |   2 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   4 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |   4 +-
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  55 ++--
 .../hdds/scm/server/StorageContainerManager.java   |  42 ++-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |  16 +-
 .../container/TestCloseContainerEventHandler.java  |  13 +-
 .../hdds/scm/container/TestReplicationManager.java |  33 ++-
 .../scm/container/TestSCMContainerManager.java     |   5 +-
 .../apache/hadoop/hdds/scm/ha/TestSCMContext.java  |  24 +-
 .../hadoop/hdds/scm/ha/TestSCMServiceManager.java  | 151 ++++++++++
 .../hdds/scm/node/TestContainerPlacement.java      |   4 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   1 -
 .../hdds/scm/pipeline/MockPipelineManager.java     |   8 -
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |  53 ++--
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  10 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  27 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |  11 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  | 114 +++-----
 .../TestRatisPipelineCreateAndDestroy.java         |   5 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |   9 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../TestCloseContainerByPipeline.java              |  10 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +-
 .../commandhandler/TestDeleteContainerHandler.java |   8 +-
 44 files changed, 1105 insertions(+), 388 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 4d87bb0..6115f16 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -32,11 +32,9 @@ public abstract class SCMCommand<T extends GeneratedMessage> 
implements
     IdentifiableEventPayload {
   private final long id;
 
-  // Under HA mode, holds term of underlying RaftServer iff current
-  // SCM is a leader, otherwise, holds term 0.
-  // Notes that, the first elected leader is from term 1, term 0,
-  // as the initial value of currentTerm, is never used under HA mode.
-  private long term = 0;
+  // If running upon Ratis, holds term of underlying RaftServer iff current
+  // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
+  private long term;
 
   SCMCommand() {
     this.id = HddsIdFactory.getLongId();
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index c7a2293..410970e 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -306,11 +306,9 @@ message SCMCommandProto {
   optional ClosePipelineCommandProto closePipelineCommandProto = 8;
   optional SetNodeOperationalStateCommandProto 
setNodeOperationalStateCommandProto = 9;
 
-  // Under HA mode, holds term of underlying RaftServer iff current
-  // SCM is a leader, otherwise, holds term 0.
-  // Notes that, the first elected leader is from term 1, term 0,
-  // as the initial value of currentTerm, is never used under HA mode.
-  optional uint64 term = 15;
+  // If running upon Ratis, holds term of underlying RaftServer iff current
+  // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
+  optional int64 term = 15;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index b215026..fb5d5d5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -113,7 +113,7 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
     blockDeletingService =
         new SCMBlockDeletingService(deletedBlockLog, containerManager,
             scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
-            svcInterval, serviceTimeout, conf);
+            scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index c3028a4..0a8e897 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -57,7 +61,8 @@ import org.slf4j.LoggerFactory;
  * SCM HB thread polls cached commands and sends them to datanode for physical
  * processing.
  */
-public class SCMBlockDeletingService extends BackgroundService {
+public class SCMBlockDeletingService extends BackgroundService
+    implements SCMService {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(SCMBlockDeletingService.class);
@@ -71,11 +76,18 @@ public class SCMBlockDeletingService extends 
BackgroundService {
 
   private int blockDeleteLimitSize;
 
+  /**
+   * SCMService related variables.
+   */
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
   @SuppressWarnings("parameternumber")
   public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
       ContainerManagerV2 containerManager, NodeManager nodeManager,
       EventPublisher eventPublisher, SCMContext scmContext,
-      Duration interval, long serviceTimeout, ConfigurationSource conf) {
+      SCMServiceManager serviceManager, Duration interval, long serviceTimeout,
+      ConfigurationSource conf) {
     super("SCMBlockDeletingService", interval.toMillis(), 
TimeUnit.MILLISECONDS,
         BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
     this.deletedBlockLog = deletedBlockLog;
@@ -88,6 +100,9 @@ public class SCMBlockDeletingService extends 
BackgroundService {
         conf.getObject(ScmConfig.class).getBlockDeletionLimit();
     Preconditions.checkArgument(blockDeleteLimitSize > 0,
         "Block deletion limit should be " + "positive.");
+
+    // register SCMBlockDeletingService to SCMServiceManager
+    serviceManager.register(this);
   }
 
   @Override
@@ -121,6 +136,10 @@ public class SCMBlockDeletingService extends 
BackgroundService {
 
     @Override
     public EmptyTaskResult call() throws Exception {
+      if (!shouldRun()) {
+        return EmptyTaskResult.newResult();
+      }
+
       long startTime = Time.monotonicNow();
       // Scan SCM DB in HB interval and collect a throttled list of
       // to delete blocks.
@@ -153,7 +172,7 @@ public class SCMBlockDeletingService extends 
BackgroundService {
               // command is bigger than a limit, e.g 50. In case datanode goes
               // offline for sometime, the cached commands be flooded.
               SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
-              command.setTerm(scmContext.getTerm());
+              command.setTerm(scmContext.getTermOfLeader());
               eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
                   new CommandForDatanode<>(dnId, command));
               if (LOG.isDebugEnabled()) {
@@ -200,4 +219,33 @@ public class SCMBlockDeletingService extends 
BackgroundService {
   public void setBlockDeleteTXNum(int numTXs) {
     blockDeleteLimitSize = numTXs;
   }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      if (scmContext.isLeader()) {
+        serviceStatus = ServiceStatus.RUNNING;
+      } else {
+        serviceStatus = ServiceStatus.PAUSING;
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      return serviceStatus == ServiceStatus.RUNNING;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return SCMBlockDeletingService.class.getSimpleName();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index d71539d..d8d31ae 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -327,7 +327,7 @@ public class AbstractContainerReportHandler {
     SCMCommand<?> command = new DeleteContainerCommand(
         containerID.getId(), true);
     try {
-      command.setTerm(scmContext.getTerm());
+      command.setTerm(scmContext.getTermOfLeader());
     } catch (NotLeaderException nle) {
       logger.warn("Skip sending delete container command," +
           " since not leader SCM", nle);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 3320d90..449252c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -81,7 +81,7 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
       if (container.getState() == LifeCycleState.CLOSING) {
         SCMCommand<?> command = new CloseContainerCommand(
             containerID.getId(), container.getPipelineID());
-        command.setTerm(scmContext.getTerm());
+        command.setTerm(scmContext.getTermOfLeader());
 
         getNodes(container).forEach(node ->
             publisher.fireEvent(DATANODE_COMMAND,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 0559c3c..ed439d0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -32,13 +32,18 @@ import java.util.Set;
 import java.util.StringJoiner;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -48,11 +53,11 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -84,8 +89,7 @@ import org.slf4j.LoggerFactory;
  * that the containers are properly replicated. Replication Manager deals only
  * with Quasi Closed / Closed container.
  */
-public class ReplicationManager
-    implements MetricsSource, EventHandler<SafeModeStatus> {
+public class ReplicationManager implements MetricsSource, SCMService {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ReplicationManager.class);
@@ -138,7 +142,7 @@ public class ReplicationManager
   /**
    * ReplicationManager specific configuration.
    */
-  private final ReplicationManagerConfiguration conf;
+  private final ReplicationManagerConfiguration rmConf;
 
   /**
    * ReplicationMonitor thread is the one which wakes up at configured
@@ -158,6 +162,16 @@ public class ReplicationManager
   private int minHealthyForMaintenance;
 
   /**
+   * SCMService related variables.
+   * After leaving safe mode, replicationMonitor needs to wait for a while
+   * before really take effect.
+   */
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+
+  /**
    * Constructs ReplicationManager instance with the given configuration.
    *
    * @param conf OzoneConfiguration
@@ -165,11 +179,13 @@ public class ReplicationManager
    * @param containerPlacement PlacementPolicy
    * @param eventPublisher EventPublisher
    */
-  public ReplicationManager(final ReplicationManagerConfiguration conf,
+  @SuppressWarnings("parameternumber")
+  public ReplicationManager(final ConfigurationSource conf,
                             final ContainerManagerV2 containerManager,
                             final PlacementPolicy containerPlacement,
                             final EventPublisher eventPublisher,
                             final SCMContext scmContext,
+                            final SCMServiceManager serviceManager,
                             final LockManager<ContainerID> lockManager,
                             final NodeManager nodeManager) {
     this.containerManager = containerManager;
@@ -178,11 +194,22 @@ public class ReplicationManager
     this.scmContext = scmContext;
     this.lockManager = lockManager;
     this.nodeManager = nodeManager;
-    this.conf = conf;
+    this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
     this.running = false;
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
-    this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum();
+    this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
+
+    this.waitTimeInMillis = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    // register ReplicationManager to SCMServiceManager.
+    serviceManager.register(this);
+
+    // start ReplicationManager.
+    start();
   }
 
   /**
@@ -263,7 +290,7 @@ public class ReplicationManager
                 " processing {} containers.", Time.monotonicNow() - start,
             containers.size());
 
-        wait(conf.getInterval());
+        wait(rmConf.getInterval());
       }
     } catch (Throwable t) {
       // When we get runtime exception, we should terminate SCM.
@@ -278,6 +305,10 @@ public class ReplicationManager
    * @param container ContainerInfo
    */
   private void processContainer(ContainerInfo container) {
+    if (!shouldRun()) {
+      return;
+    }
+
     final ContainerID id = container.containerID();
     lockManager.lock(id);
     try {
@@ -419,7 +450,7 @@ public class ReplicationManager
       final Map<ContainerID, List<InflightAction>> inflightActions,
       final Predicate<InflightAction> filter) {
     final ContainerID id = container.containerID();
-    final long deadline = Time.monotonicNow() - conf.getEventTimeout();
+    final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
     if (inflightActions.containsKey(id)) {
       final List<InflightAction> actions = inflightActions.get(id);
 
@@ -971,7 +1002,7 @@ public class ReplicationManager
         new CloseContainerCommand(container.getContainerID(),
             container.getPipelineID(), force);
     try {
-      closeContainerCommand.setTerm(scmContext.getTerm());
+      closeContainerCommand.setTerm(scmContext.getTermOfLeader());
     } catch (NotLeaderException nle) {
       LOG.warn("Skip sending close container command,"
           + " since current SCM is not leader.", nle);
@@ -1043,7 +1074,7 @@ public class ReplicationManager
       final SCMCommand<T> command,
       final Consumer<InflightAction> tracker) {
     try {
-      command.setTerm(scmContext.getTerm());
+      command.setTerm(scmContext.getTermOfLeader());
     } catch (NotLeaderException nle) {
       LOG.warn("Skip sending datanode command,"
           + " since current SCM is not leader.", nle);
@@ -1120,14 +1151,6 @@ public class ReplicationManager
         .endRecord();
   }
 
-  @Override
-  public void onMessage(SafeModeStatus status,
-      EventPublisher publisher) {
-    if (!status.isInSafeMode() && !this.isRunning()) {
-      this.start();
-    }
-  }
-
   /**
    * Wrapper class to hold the InflightAction with its start time.
    */
@@ -1241,4 +1264,42 @@ public class ReplicationManager
           .toString();
     }
   }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      // 1) SCMContext#isLeader returns true.
+      // 2) not in safe mode.
+      if (scmContext.isLeader() && !scmContext.isInSafeMode()) {
+        // transition from PAUSING to RUNNING
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          LOG.info("Service {} transitions to RUNNING.", getServiceName());
+          lastTimeToBeReadyInMillis = Time.monotonicNow();
+          serviceStatus = ServiceStatus.RUNNING;
+        }
+      } else {
+        serviceStatus = ServiceStatus.PAUSING;
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING &&
+          Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return ReplicationManager.class.getSimpleName();
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 6f6cc54..d01257b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -194,9 +194,6 @@ public final class SCMEvents {
   public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
       new TypedEvent<>(SafeModeStatus.class, "Safe mode status");
 
-  public static final TypedEvent<SafeModeStatus> DELAYED_SAFE_MODE_STATUS =
-      new TypedEvent<>(SafeModeStatus.class, "Delayed safe mode status");
-
   /**
    * Private Ctor. Never Constructed.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
index 17dad7e..2d4941f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,14 +30,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  * SCMContext is the single source of truth for some key information shared
  * across all components within SCM, including:
- *  - RaftServer related info, e.g., isLeader, term.
- *  - SafeMode related info, e.g., inSafeMode, preCheckComplete.
+ * 1) RaftServer related info, e.g., isLeader, term.
+ * 2) SafeMode related info, e.g., inSafeMode, preCheckComplete.
+ *
+ * If current SCM is not running upon Ratis, the {@link SCMContext#isLeader}
+ * check will always return true, and {@link SCMContext#getTermOfLeader} will
+ * return INVALID_TERM.
  */
-public class SCMContext implements EventHandler<SafeModeStatus> {
+public final class SCMContext {
   private static final Logger LOG = LoggerFactory.getLogger(SCMContext.class);
 
+  /**
+   * The initial value of term in raft is 0, and term increases monotonically.
+   * term equals INVALID_TERM indicates current SCM is running without Ratis.
+   */
+  public static final long INVALID_TERM = -1;
+
   private static final SCMContext EMPTY_CONTEXT
-      = new SCMContext(true, 0, new SafeModeStatus(false, true), null);
+      = new SCMContext.Builder().build();
 
   /**
    * Used by non-HA mode SCM, Recon and Unit Tests.
@@ -62,9 +70,8 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
   private final StorageContainerManager scm;
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  SCMContext(boolean isLeader, long term,
-             final SafeModeStatus safeModeStatus,
-             final StorageContainerManager scm) {
+  private SCMContext(boolean isLeader, long term,
+      final SafeModeStatus safeModeStatus, final StorageContainerManager scm) {
     this.isLeader = isLeader;
     this.term = term;
     this.safeModeStatus = safeModeStatus;
@@ -72,25 +79,16 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
   }
 
   /**
-   * Creates SCMContext instance from StorageContainerManager.
-   */
-  public SCMContext(final StorageContainerManager scm) {
-    this(false, 0, new SafeModeStatus(true, false), scm);
-    Preconditions.checkNotNull(scm, "scm is null");
-  }
-
-  /**
-   *
-   * @param newIsLeader : is leader or not
-   * @param newTerm     : term if current SCM becomes leader
+   * @param leader  : is leader or not
+   * @param newTerm : term if current SCM becomes leader
    */
-  public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) {
+  public void updateLeaderAndTerm(boolean leader, long newTerm) {
     lock.writeLock().lock();
     try {
       LOG.info("update <isLeader,term> from <{},{}> to <{},{}>",
-          isLeader, term, newIsLeader, newTerm);
+          isLeader, term, leader, newTerm);
 
-      isLeader = newIsLeader;
+      isLeader = leader;
       term = newTerm;
     } finally {
       lock.writeLock().unlock();
@@ -105,6 +103,10 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
   public boolean isLeader() {
     lock.readLock().lock();
     try {
+      if (term == INVALID_TERM) {
+        return true;
+      }
+
       return isLeader;
     } finally {
       lock.readLock().unlock();
@@ -117,9 +119,13 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
    * @return term
    * @throws NotLeaderException if isLeader is false
    */
-  public long getTerm() throws NotLeaderException {
+  public long getTermOfLeader() throws NotLeaderException {
     lock.readLock().lock();
     try {
+      if (term == INVALID_TERM) {
+        return term;
+      }
+
       if (!isLeader) {
         LOG.warn("getTerm is invoked when not leader.");
         throw scm.getScmHAManager()
@@ -132,9 +138,10 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
     }
   }
 
-  @Override
-  public void onMessage(SafeModeStatus status,
-                        EventPublisher publisher) {
+  /**
+   * @param status : update SCMContext with latest SafeModeStatus.
+   */
+  public void updateSafeModeStatus(SafeModeStatus status) {
     lock.writeLock().lock();
     try {
       LOG.info("Update SafeModeStatus from {} to {}.", safeModeStatus, status);
@@ -161,4 +168,57 @@ public class SCMContext implements 
EventHandler<SafeModeStatus> {
       lock.readLock().unlock();
     }
   }
+
+  /**
+   * @return StorageContainerManager
+   */
+  public StorageContainerManager getScm() {
+    Preconditions.checkNotNull(scm, "scm == null");
+    return scm;
+  }
+
+  public static class Builder {
+    /**
+     * The default context:
+     * running without Ratis, out of safe mode, and has completed preCheck.
+     */
+    private boolean isLeader = false;
+    private long term = INVALID_TERM;
+    private boolean isInSafeMode = false;
+    private boolean isPreCheckComplete = true;
+    private StorageContainerManager scm = null;
+
+    public Builder setLeader(boolean leader) {
+      this.isLeader = leader;
+      return this;
+    }
+
+    public Builder setTerm(long newTerm) {
+      this.term = newTerm;
+      return this;
+    }
+
+    public Builder setIsInSafeMode(boolean inSafeMode) {
+      this.isInSafeMode = inSafeMode;
+      return this;
+    }
+
+    public Builder setIsPreCheckComplete(boolean preCheckComplete) {
+      this.isPreCheckComplete = preCheckComplete;
+      return this;
+    }
+
+    public Builder setSCM(StorageContainerManager storageContainerManager) {
+      this.scm = storageContainerManager;
+      return this;
+    }
+
+    public SCMContext build() {
+      return new SCMContext(
+          isLeader,
+          term,
+          new SafeModeStatus(isInSafeMode, isPreCheckComplete),
+          scm);
+    }
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
new file mode 100644
index 0000000..32194a6
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Interface for background services in SCM, including ReplicationManager,
+ * SCMBlockDeletingService and BackgroundPipelineCreator.
+ *
+ * Provide a fine-grained method to manipulate the status of these background
+ * services.
+ */
+public interface SCMService {
+  /**
+   * Notify raft or safe mode related status changed.
+   */
+  void notifyStatusChanged();
+
+  /**
+   * @param event latest triggered event.
+   */
+  default void notifyEventTriggered(Event event) {
+  }
+
+  /**
+   * @return true, if next iteration of Service should take effect,
+   *         false, if next iteration of Service should be skipped.
+   */
+  boolean shouldRun();
+
+  /**
+   * @return name of the Service.
+   */
+  String getServiceName();
+
+  /**
+   * Status of Service.
+   */
+  enum ServiceStatus {
+    RUNNING,
+    PAUSING
+  }
+
+  /**
+   * One time event.
+   */
+  enum Event {
+    PRE_CHECK_COMPLETED,
+    NEW_NODE_HANDLER_TRIGGERED,
+    UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
new file mode 100644
index 0000000..52e3d26
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.scm.ha.SCMService.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Manipulate background services in SCM, including ReplicationManager,
+ * SCMBlockDeletingService and BackgroundPipelineCreator.
+ */
+public final class SCMServiceManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMServiceManager.class);
+
+  private final List<SCMService> services = new ArrayList<>();
+
+  /**
+   * Register a SCMService to SCMServiceManager.
+   */
+  public synchronized void register(SCMService service) {
+    Preconditions.checkNotNull(service);
+    LOG.info("Registering service {}.", service.getServiceName());
+    services.add(service);
+  }
+
+  /**
+   * Notify raft or safe mode related status changed.
+   */
+  public synchronized void notifyStatusChanged() {
+    for (SCMService service : services) {
+      LOG.debug("Notify service:{}.", service.getServiceName());
+      service.notifyStatusChanged();
+    }
+  }
+
+  /**
+   * Notify event triggered, which may affect SCMService.
+   */
+  public synchronized void notifyEventTriggered(Event event) {
+    for (SCMService service : services) {
+      LOG.debug("Notify service:{} with event:{.",
+          service.getServiceName(), event);
+      service.notifyEventTriggered(event);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index aa366e1..a04f0d8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -133,7 +133,9 @@ public class SCMStateMachine extends BaseStateMachine {
   @Override
   public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
     LOG.info("current leader SCM steps down.");
-    scm.getScmContext().updateIsLeaderAndTerm(false, 0);
+
+    scm.getScmContext().updateLeaderAndTerm(false, 0);
+    scm.getSCMServiceManager().notifyStatusChanged();
   }
 
   @Override
@@ -151,7 +153,9 @@ public class SCMStateMachine extends BaseStateMachine {
         .getCurrentTerm();
 
     LOG.info("current SCM becomes leader of term {}.", term);
-    scm.getScmContext().updateIsLeaderAndTerm(true, term);
+
+    scm.getScmContext().updateLeaderAndTerm(true, term);
+    scm.getSCMServiceManager().notifyStatusChanged();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index c511b55..674cf2d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hdds.scm.node;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,32 +40,33 @@ public class NewNodeHandler implements 
EventHandler<DatanodeDetails> {
   private final PipelineManager pipelineManager;
   private final NodeDecommissionManager decommissionManager;
   private final ConfigurationSource conf;
+  private final SCMServiceManager serviceManager;
 
   public NewNodeHandler(PipelineManager pipelineManager,
       NodeDecommissionManager decommissionManager,
-      ConfigurationSource conf) {
+      ConfigurationSource conf,
+      SCMServiceManager serviceManager) {
     this.pipelineManager = pipelineManager;
     this.decommissionManager = decommissionManager;
     this.conf = conf;
+    this.serviceManager = serviceManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
     try {
-      pipelineManager.triggerPipelineCreation();
+      serviceManager.notifyEventTriggered(Event.NEW_NODE_HANDLER_TRIGGERED);
+
       if (datanodeDetails.getPersistedOpState()
           != HddsProtos.NodeOperationalState.IN_SERVICE) {
         decommissionManager.continueAdminForNode(datanodeDetails);
       }
-    }catch (NodeNotFoundException  e) {
+    } catch (NodeNotFoundException e) {
       // Should not happen, as the node has just registered to call this event
       // handler.
       LOG.warn("NodeNotFound when adding the node to the decommissionManager",
           e);
-    } catch (NotLeaderException nle) {
-      LOG.debug("Not the current leader SCM and cannot start pipeline" +
-          " creation.");
     }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
index 1cb6501..d74f90f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hdds.scm.node;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,23 +35,19 @@ public class NonHealthyToHealthyNodeHandler
   private static final Logger LOG =
       LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class);
 
-  private final PipelineManager pipelineManager;
   private final ConfigurationSource conf;
+  private final SCMServiceManager serviceManager;
 
   public NonHealthyToHealthyNodeHandler(
-      PipelineManager pipelineManager, OzoneConfiguration conf) {
-    this.pipelineManager = pipelineManager;
+      OzoneConfiguration conf,  SCMServiceManager serviceManager) {
     this.conf = conf;
+    this.serviceManager = serviceManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    try {
-      pipelineManager.triggerPipelineCreation();
-    } catch (NotLeaderException ex) {
-      LOG.debug("Not the current leader SCM and cannot start pipeline" +
-          " creation.");
-    }
+    serviceManager.notifyEventTriggered(
+        Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 4e6f53e..e2e50e8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -433,7 +433,7 @@ public class SCMNodeManager implements NodeManager {
             Time.monotonicNow(),
             scmStatus.getOperationalState(),
             scmStatus.getOpStateExpiryEpochSeconds());
-        command.setTerm(scmContext.getTerm());
+        command.setTerm(scmContext.getTermOfLeader());
         addDatanodeCommand(reportedDn.getUuid(), command);
       } catch (NotLeaderException nle) {
         LOG.warn("Skip sending SetNodeOperationalStateCommand,"
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
new file mode 100644
index 0000000..343444d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.iterators.LoopingIterator;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static 
org.apache.hadoop.hdds.scm.ha.SCMService.Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED;
+import static 
org.apache.hadoop.hdds.scm.ha.SCMService.Event.NEW_NODE_HANDLER_TRIGGERED;
+import static 
org.apache.hadoop.hdds.scm.ha.SCMService.Event.PRE_CHECK_COMPLETED;
+
+/**
+ * Implements api for running background pipeline creation jobs.
+ */
+public class BackgroundPipelineCreatorV2 implements SCMService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundPipelineCreator.class);
+
+  private final PipelineManager pipelineManager;
+  private final ConfigurationSource conf;
+  private final SCMContext scmContext;
+
+  /**
+   * SCMService related variables.
+   * 1) after leaving safe mode, BackgroundPipelineCreator needs to
+   *    wait for a while before really take effect.
+   * 2) NewNodeHandler, NonHealthyToHealthyNodeHandler, PreCheckComplete
+   *    will trigger a one-shot run of BackgroundPipelineCreator,
+   *    no matter in safe mode or not.
+   */
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+  private final boolean createPipelineInSafeMode;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+  private boolean oneShotRun = false;
+
+  /**
+   * RatisPipelineUtilsThread is the one which wakes up at
+   * configured interval and tries to create pipelines.
+   */
+  private Thread thread;
+  private final Object monitor = new Object();
+  private static final String THREAD_NAME = "RatisPipelineUtilsThread";
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final long intervalInMillis;
+
+
+  BackgroundPipelineCreatorV2(PipelineManager pipelineManager,
+                              ConfigurationSource conf,
+                              SCMServiceManager serviceManager,
+                              SCMContext scmContext) {
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+    this.scmContext = scmContext;
+
+    this.createPipelineInSafeMode = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    this.waitTimeInMillis = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    this.intervalInMillis = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    // register BackgroundPipelineCreator to SCMServiceManager
+    serviceManager.register(this);
+
+    // start RatisPipelineUtilsThread
+    start();
+  }
+
+  /**
+   * Start RatisPipelineUtilsThread.
+   */
+  public void start() {
+    if (!running.compareAndSet(false, true)) {
+      LOG.warn("{} is already started, just ignore.", THREAD_NAME);
+      return;
+    }
+
+    LOG.info("Starting {}.", THREAD_NAME);
+
+    thread = new ThreadFactoryBuilder()
+        .setDaemon(false)
+        .setNameFormat(THREAD_NAME + " - %d")
+        .setUncaughtExceptionHandler((Thread t, Throwable ex) -> {
+          // gracefully shutdown SCM.
+          scmContext.getScm().stop();
+
+          String message = "Terminate SCM, encounter uncaught exception"
+              + " in RatisPipelineUtilsThread";
+          ExitUtils.terminate(1, message, ex, LOG);
+        })
+        .build()
+        .newThread(this::run);
+
+    thread.start();
+  }
+
+  /**
+   * Stop RatisPipelineUtilsThread.
+   */
+  public void stop() {
+    if (running.compareAndSet(true, false)) {
+      LOG.warn("{} is not running, just ignore.", THREAD_NAME);
+      return;
+    }
+
+    LOG.info("Stopping {}.", THREAD_NAME);
+
+    // in case RatisPipelineUtilsThread is sleeping
+    synchronized (monitor) {
+      monitor.notifyAll();
+    }
+
+    try {
+      thread.join();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted during join {}.", THREAD_NAME);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void run() {
+    while (running.get()) {
+      if (shouldRun()) {
+        createPipelines();
+      }
+
+      try {
+        synchronized (monitor) {
+          monitor.wait(intervalInMillis);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} is interrupted.", THREAD_NAME);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private boolean skipCreation(HddsProtos.ReplicationFactor factor,
+                               HddsProtos.ReplicationType type,
+                               boolean autoCreate) {
+    if (type == HddsProtos.ReplicationType.RATIS) {
+      return factor == HddsProtos.ReplicationFactor.ONE && (!autoCreate);
+    } else {
+      // For STAND_ALONE Replication Type, Replication Factor 3 should not be
+      // used.
+      return factor == HddsProtos.ReplicationFactor.THREE;
+    }
+  }
+
+  private void createPipelines() throws RuntimeException {
+    // TODO: #CLUTIL Different replication factor may need to be supported
+    HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
+        conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+            OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+    boolean autoCreateFactorOne = conf.getBoolean(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT);
+
+    List<HddsProtos.ReplicationFactor> list =
+        new ArrayList<>();
+    for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+        .values()) {
+      if (skipCreation(factor, type, autoCreateFactorOne)) {
+        // Skip this iteration for creating pipeline
+        continue;
+      }
+      list.add(factor);
+      if (!pipelineManager.getSafeModeStatus()) {
+        try {
+          pipelineManager.scrubPipeline(type, factor);
+        } catch (IOException e) {
+          LOG.error("Error while scrubbing pipelines.", e);
+        }
+      }
+    }
+
+    LoopingIterator it = new LoopingIterator(list);
+    while (it.hasNext()) {
+      HddsProtos.ReplicationFactor factor =
+          (HddsProtos.ReplicationFactor) it.next();
+
+      try {
+        pipelineManager.createPipeline(type, factor);
+      } catch (IOException ioe) {
+        it.remove();
+      } catch (Throwable t) {
+        LOG.error("Error while creating pipelines", t);
+        it.remove();
+      }
+    }
+
+    LOG.debug("BackgroundPipelineCreator createPipelines finished.");
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      // 1) SCMContext#isLeader returns true.
+      // 2) not in safe mode or createPipelineInSafeMode is true
+      if (scmContext.isLeader() &&
+          (!scmContext.isInSafeMode() || createPipelineInSafeMode)) {
+        // transition from PAUSING to RUNNING
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          LOG.info("Service {} transitions to RUNNING.", getServiceName());
+          lastTimeToBeReadyInMillis = Time.monotonicNow();
+          serviceStatus = ServiceStatus.RUNNING;
+        }
+      } else {
+        serviceStatus = ServiceStatus.PAUSING;
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public void notifyEventTriggered(Event event) {
+    if (!scmContext.isLeader()) {
+      LOG.info("ignore, not leader SCM.");
+      return;
+    }
+    if (event == NEW_NODE_HANDLER_TRIGGERED
+        || event == UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED
+        || event == PRE_CHECK_COMPLETED) {
+      LOG.info("trigger a one-shot run on {}.", THREAD_NAME);
+      oneShotRun = true;
+
+      synchronized (monitor) {
+        monitor.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // check one-short run
+      if (oneShotRun) {
+        oneShotRun = false;
+        return true;
+      }
+
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING && (
+          createPipelineInSafeMode ||
+          Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis);
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return BackgroundPipelineCreator.class.getSimpleName();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 89d2833..e33f256 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -94,7 +94,7 @@ public class PipelineActionHandler
           "firing close pipeline event.", action, pid);
       SCMCommand<?> command = new ClosePipelineCommand(pid);
       try {
-        command.setTerm(scmContext.getTerm());
+        command.setTerm(scmContext.getTermOfLeader());
       } catch (NotLeaderException nle) {
         LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," +
             " since not leader SCM.", pid);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9f714da..2078460 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -28,15 +28,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 
 /**
  * Interface which exposes the api for pipeline management.
  */
-public interface PipelineManager extends Closeable, PipelineManagerMXBean,
-    EventHandler<SafeModeStatus> {
+public interface PipelineManager extends Closeable, PipelineManagerMXBean {
 
   Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException;
@@ -85,7 +82,7 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean,
 
   void startPipelineCreator();
 
-  void triggerPipelineCreation() throws NotLeaderException;
+  void triggerPipelineCreation();
 
   void incNumBlocksAllocatedMetric(PipelineID id);
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 3c88174..d0a4c96 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -30,10 +30,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.Time;
@@ -52,7 +51,6 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -69,27 +67,24 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   private final Lock lock;
   private PipelineFactory pipelineFactory;
   private StateManager stateManager;
-  private Scheduler scheduler;
-  private BackgroundPipelineCreator backgroundPipelineCreator;
+  private BackgroundPipelineCreatorV2 backgroundPipelineCreator;
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
   private final SCMPipelineMetrics metrics;
-  private long pipelineWaitDefaultTimeout;
-  private final AtomicBoolean isInSafeMode;
-  private SCMHAManager scmhaManager;
-  private NodeManager nodeManager;
-  // Used to track if the safemode pre-checks have completed. This is designed
-  // to prevent pipelines being created until sufficient nodes have registered.
-  private final AtomicBoolean pipelineCreationAllowed;
+  private final long pipelineWaitDefaultTimeout;
+  private final SCMHAManager scmhaManager;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
 
   private PipelineManagerV2Impl(ConfigurationSource conf,
                                 SCMHAManager scmhaManager,
                                 NodeManager nodeManager,
                                 StateManager pipelineStateManager,
                                 PipelineFactory pipelineFactory,
-                                EventPublisher eventPublisher) {
+                                EventPublisher eventPublisher,
+                                SCMContext scmContext) {
     this.lock = new ReentrantLock();
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
@@ -97,6 +92,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
     this.scmhaManager = scmhaManager;
     this.nodeManager = nodeManager;
     this.eventPublisher = eventPublisher;
+    this.scmContext = scmContext;
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
     this.metrics = SCMPipelineMetrics.create();
@@ -104,12 +100,6 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
-    this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
-        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
-        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
-    // Pipeline creation is only allowed after the safemode prechecks have
-    // passed, eg sufficient nodes have registered.
-    this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
   }
 
   public static PipelineManagerV2Impl newPipelineManager(
@@ -118,7 +108,8 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       NodeManager nodeManager,
       Table<PipelineID, Pipeline> pipelineStore,
       EventPublisher eventPublisher,
-      SCMContext scmContext) throws IOException {
+      SCMContext scmContext,
+      SCMServiceManager serviceManager) throws IOException {
     // Create PipelineStateManager
     StateManager stateManager = PipelineStateManagerV2Impl
         .newBuilder().setPipelineStore(pipelineStore)
@@ -130,18 +121,18 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
     // Create PipelineFactory
     PipelineFactory pipelineFactory = new PipelineFactory(
         nodeManager, stateManager, conf, eventPublisher, scmContext);
+
     // Create PipelineManager
     PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
         scmhaManager, nodeManager, stateManager, pipelineFactory,
-        eventPublisher);
+        eventPublisher, scmContext);
 
     // Create background thread.
-    Scheduler scheduler = new Scheduler(
-        "RatisPipelineUtilsThread", false, 1);
-    BackgroundPipelineCreator backgroundPipelineCreator =
-        new BackgroundPipelineCreator(pipelineManager, scheduler, conf);
+    BackgroundPipelineCreatorV2 backgroundPipelineCreator =
+        new BackgroundPipelineCreatorV2(
+            pipelineManager, conf, serviceManager, scmContext);
+
     pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
-    pipelineManager.setScheduler(scheduler);
 
     return pipelineManager;
   }
@@ -385,17 +376,15 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
    */
   @Override
   public void startPipelineCreator() {
-    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+    throw new RuntimeException("Not supported in HA code.");
   }
 
   /**
    * Triggers pipeline creation after the specified time.
    */
   @Override
-  public void triggerPipelineCreation() throws NotLeaderException {
-    // TODO add checkLeader once follower validates safemode
-    // before it becomes leader.
-    backgroundPipelineCreator.triggerPipelineCreation();
+  public void triggerPipelineCreation() {
+    throw new RuntimeException("Not supported in HA code.");
   }
 
   @Override
@@ -497,14 +486,13 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
    */
   @Override
   public boolean getSafeModeStatus() {
-    return this.isInSafeMode.get();
+    return scmContext.isInSafeMode();
   }
 
   @Override
   public void close() throws IOException {
-    if (scheduler != null) {
-      scheduler.close();
-      scheduler = null;
+    if (backgroundPipelineCreator != null) {
+      backgroundPipelineCreator.stop();
     }
 
     if(pmInfoBean != null) {
@@ -523,40 +511,9 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
     }
   }
 
-  @Override
-  public void onMessage(SCMSafeModeManager.SafeModeStatus status,
-                        EventPublisher publisher) {
-    // TODO: #CLUTIL - handle safemode getting re-enabled
-    boolean currentAllowPipelines =
-        pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
-    boolean currentlyInSafeMode =
-        isInSafeMode.getAndSet(status.isInSafeMode());
-
-    // Trigger pipeline creation only if the preCheck status has changed to
-    // complete.
-
-    try {
-      if (isPipelineCreationAllowed() && !currentAllowPipelines) {
-        triggerPipelineCreation();
-      }
-      // Start the pipeline creation thread only when safemode switches off
-      if (!getSafeModeStatus() && currentlyInSafeMode) {
-        startPipelineCreator();
-      }
-    } catch (NotLeaderException ex) {
-      LOG.warn("Not leader SCM, cannot process pipeline creation.");
-    }
-
-  }
-
   @VisibleForTesting
   public boolean isPipelineCreationAllowed() {
-    return pipelineCreationAllowed.get();
-  }
-
-  @VisibleForTesting
-  public void allowPipelineCreation() {
-    this.pipelineCreationAllowed.set(true);
+    return scmContext.isLeader() && scmContext.isPreCheckComplete();
   }
 
   @VisibleForTesting
@@ -576,12 +533,13 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   }
 
   private void setBackgroundPipelineCreator(
-      BackgroundPipelineCreator backgroundPipelineCreator) {
+      BackgroundPipelineCreatorV2 backgroundPipelineCreator) {
     this.backgroundPipelineCreator = backgroundPipelineCreator;
   }
 
-  private void setScheduler(Scheduler scheduler) {
-    this.scheduler = scheduler;
+  @VisibleForTesting
+  public BackgroundPipelineCreatorV2 getBackgroundPipelineCreator() {
+    return this.backgroundPipelineCreator;
   }
 
   private void recordMetricsForPipeline(Pipeline pipeline) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index ca51433..8fc7f3e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -103,7 +103,7 @@ public class PipelineReportHandler implements
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
       SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
-      command.setTerm(scmContext.getTerm());
+      command.setTerm(scmContext.getTermOfLeader());
       publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
           new CommandForDatanode<>(dn.getUuid(), command));
       return;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index ede3b1e..e485a28 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -163,7 +163,7 @@ public class RatisPipelineProvider extends PipelineProvider 
{
         new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
             factor, dns);
 
-    createCommand.setTerm(scmContext.getTerm());
+    createCommand.setTerm(scmContext.getTermOfLeader());
 
     dns.forEach(node -> {
       LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}",
@@ -201,7 +201,7 @@ public class RatisPipelineProvider extends PipelineProvider 
{
   public void close(Pipeline pipeline) throws NotLeaderException {
     final ClosePipelineCommand closeCommand =
         new ClosePipelineCommand(pipeline.getId());
-    closeCommand.setTerm(scmContext.getTerm());
+    closeCommand.setTerm(scmContext.getTermOfLeader());
     pipeline.getNodes().forEach(node -> {
       final CommandForDatanode<?> datanodeCommand =
           new CommandForDatanode<>(node.getUuid(), closeCommand);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 892e3bc..a1cef22 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -64,7 +65,8 @@ import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAI
  * for pipelines must come via PipelineManager. It synchronises all write
  * and read operations via a ReadWriteLock.
  */
-public class SCMPipelineManager implements PipelineManager {
+public class SCMPipelineManager implements
+    PipelineManager, EventHandler<SafeModeStatus> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMPipelineManager.class);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index 26fb806..e4e069a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -30,6 +29,9 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -83,7 +85,6 @@ public class SCMSafeModeManager implements SafeModeManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMSafeModeManager.class);
   private final boolean isSafeModeEnabled;
-  private final long waitTime;
   private AtomicBoolean inSafeMode = new AtomicBoolean(true);
   private AtomicBoolean preCheckComplete = new AtomicBoolean(false);
 
@@ -102,25 +103,24 @@ public class SCMSafeModeManager implements 
SafeModeManager {
 
   private final EventQueue eventPublisher;
   private final PipelineManager pipelineManager;
+  private final SCMServiceManager serviceManager;
+  private final SCMContext scmContext;
 
   private final SafeModeMetrics safeModeMetrics;
 
   public SCMSafeModeManager(ConfigurationSource conf,
       List<ContainerInfo> allContainers, PipelineManager pipelineManager,
-      EventQueue eventQueue) {
+      EventQueue eventQueue, SCMServiceManager serviceManager,
+      SCMContext scmContext) {
     this.config = conf;
     this.pipelineManager = pipelineManager;
     this.eventPublisher = eventQueue;
+    this.serviceManager = serviceManager;
+    this.scmContext = scmContext;
     this.isSafeModeEnabled = conf.getBoolean(
         HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT);
 
-
-    this.waitTime = conf.getTimeDuration(
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
     if (isSafeModeEnabled) {
       this.safeModeMetrics = SafeModeMetrics.create();
       ContainerSafeModeRule containerSafeModeRule =
@@ -147,13 +147,6 @@ public class SCMSafeModeManager implements SafeModeManager 
{
         exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
             oneReplicaPipelineSafeModeRule);
       }
-      boolean createPipelineInSafemode = conf.getBoolean(
-          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
-          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
-
-      if (createPipelineInSafemode) {
-        pipelineManager.startPipelineCreator();
-      }
     } else {
       this.safeModeMetrics = null;
       exitSafeMode(eventQueue);
@@ -177,28 +170,22 @@ public class SCMSafeModeManager implements 
SafeModeManager {
   public void emitSafeModeStatus() {
     SafeModeStatus safeModeStatus =
         new SafeModeStatus(getInSafeMode(), getPreCheckComplete());
+    // TODO: remove eventPublisher,
+    //  since there will no consumer of SAFE_MODE_STATUS in future.
     eventPublisher.fireEvent(SCMEvents.SAFE_MODE_STATUS,
         safeModeStatus);
 
-    // Only notify the delayed listeners if safemode remains on, as precheck
-    // may have completed.
-    if (safeModeStatus.isInSafeMode()) {
-      eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS,
-          safeModeStatus);
-    } else {
+    // update SCMContext
+    scmContext.updateSafeModeStatus(safeModeStatus);
+
+    // notify SCMServiceManager
+    if (!safeModeStatus.isInSafeMode()) {
       // If safemode is off, then notify the delayed listeners with a delay.
-      final Thread safeModeExitThread = new Thread(() -> {
-        try {
-          Thread.sleep(waitTime);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-        eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS,
-            safeModeStatus);
-      });
-
-      safeModeExitThread.setDaemon(true);
-      safeModeExitThread.start();
+      serviceManager.notifyStatusChanged();
+    } else if (safeModeStatus.isPreCheckComplete()) {
+      // Only notify the delayed listeners if safemode remains on, as precheck
+      // may have completed.
+      serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6782f52..809dda9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -70,7 +71,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
-import 
org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
@@ -174,6 +174,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   private SCMContext scmContext;
 
   private final EventQueue eventQueue;
+  private final SCMServiceManager serviceManager;
+
   /*
    * HTTP endpoint for JMX access.
    */
@@ -284,6 +286,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     }
 
     eventQueue = new EventQueue();
+    serviceManager = new SCMServiceManager();
+
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
             HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
@@ -303,7 +307,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager,
-        scmDecommissionManager, conf);
+        scmDecommissionManager, conf, serviceManager);
     StaleNodeHandler staleNodeHandler =
         new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
@@ -311,7 +315,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     StartDatanodeAdminHandler datanodeStartAdminHandler =
         new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
     NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
-        new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
+        new NonHealthyToHealthyNodeHandler(conf, serviceManager);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
     PendingDeleteHandler pendingDeleteHandler =
         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
@@ -361,14 +365,6 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
-    eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, scmContext);
-    // TODO:
-    //  handle replicationManager and pipelineManager in ServiceManager
-    eventQueue
-        .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, replicationManager);
-    eventQueue
-        .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, pipelineManager);
-
 
     // Emit initial safe mode status, as now handlers are registered.
     scmSafeModeManager.emitSafeModeStatus();
@@ -436,7 +432,14 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     if (configurator.getScmContext() != null) {
       scmContext = configurator.getScmContext();
     } else {
-      scmContext = new SCMContext(this);
+      // non-leader of term 0, in safe mode, preCheck not completed.
+      scmContext = new SCMContext.Builder()
+          .setLeader(false)
+          .setTerm(0)
+          .setIsInSafeMode(true)
+          .setIsPreCheckComplete(false)
+          .setSCM(this)
+          .build();
     }
 
     if(configurator.getScmNodeManager() != null) {
@@ -461,7 +464,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
               scmNodeManager,
               scmMetadataStore.getPipelineTable(),
               eventQueue,
-              scmContext);
+              scmContext,
+              serviceManager);
     }
 
     if (configurator.getContainerManager() != null) {
@@ -481,11 +485,12 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       replicationManager = configurator.getReplicationManager();
     }  else {
       replicationManager = new ReplicationManager(
-          conf.getObject(ReplicationManagerConfiguration.class),
+          conf,
           containerManager,
           containerPlacementPolicy,
           eventQueue,
           scmContext,
+          serviceManager,
           new LockManager<>(conf),
           scmNodeManager);
     }
@@ -494,7 +499,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     } else {
       scmSafeModeManager = new SCMSafeModeManager(conf,
           containerManager.getContainers(),
-          pipelineManager, eventQueue);
+          pipelineManager, eventQueue, serviceManager, scmContext);
     }
     scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
         containerManager, eventQueue, replicationManager);
@@ -1166,6 +1171,13 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   }
 
   /**
+   * Returns SCMServiceManager.
+   */
+  public SCMServiceManager getSCMServiceManager() {
+    return serviceManager;
+  }
+
+  /**
    * Force SCM out of safe mode.
    */
   public boolean exitSafeMode() {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index c8c8243..a2a13e3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -91,6 +92,7 @@ public class TestBlockManager {
   private static HddsProtos.ReplicationType type;
   private EventQueue eventQueue;
   private SCMContext scmContext;
+  private SCMServiceManager serviceManager;
   private int numContainerPerOwnerInPipeline;
   private OzoneConfiguration conf;
 
@@ -121,6 +123,7 @@ public class TestBlockManager {
 
     eventQueue = new EventQueue();
     scmContext = SCMContext.emptyContext();
+    serviceManager = new SCMServiceManager();
 
     scmMetadataStore = new SCMMetadataStoreImpl(conf);
     scmMetadataStore.start(conf);
@@ -131,8 +134,8 @@ public class TestBlockManager {
             nodeManager,
             scmMetadataStore.getPipelineTable(),
             eventQueue,
-            scmContext);
-    pipelineManager.allowPipelineCreation();
+            scmContext,
+            serviceManager);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -146,7 +149,7 @@ public class TestBlockManager {
             scmMetadataStore.getContainerTable());
     SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
         containerManager.getContainers(),
-        pipelineManager, eventQueue) {
+        pipelineManager, eventQueue, serviceManager, scmContext) {
       @Override
       public void emitSafeModeStatus() {
         // skip
@@ -173,8 +176,7 @@ public class TestBlockManager {
     factor = HddsProtos.ReplicationFactor.THREE;
     type = HddsProtos.ReplicationType.RATIS;
 
-    scm.getScmContext().onMessage(
-        new SafeModeStatus(false, true), null);
+    scm.getScmContext().updateSafeModeStatus(new SafeModeStatus(false, true));
   }
 
   @After
@@ -460,8 +462,8 @@ public class TestBlockManager {
 
   @Test
   public void testAllocateBlockFailureInSafeMode() throws Exception {
-    scm.getScmContext().onMessage(
-        new SCMSafeModeManager.SafeModeStatus(true, true), null);
+    scm.getScmContext().updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, true));
     // Test1: In safe mode expect an SCMException.
     thrown.expectMessage("SafeModePrecheck failed for "
         + "allocateBlock");
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 6d5fee2..d42db5d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -80,6 +82,8 @@ public class TestCloseContainerEventHandler {
     scmContext = SCMContext.emptyContext();
     scmMetadataStore = new SCMMetadataStoreImpl(configuration);
 
+    SCMServiceManager serviceManager = new SCMServiceManager();
+
     pipelineManager =
         PipelineManagerV2Impl.newPipelineManager(
             configuration,
@@ -87,9 +91,9 @@ public class TestCloseContainerEventHandler {
             nodeManager,
             scmMetadataStore.getPipelineTable(),
             eventQueue,
-            scmContext);
+            scmContext,
+            serviceManager);
 
-    pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration, eventQueue);
@@ -99,7 +103,10 @@ public class TestCloseContainerEventHandler {
         MockSCMHAManager.getInstance(true),
         pipelineManager,
         scmMetadataStore.getContainerTable());
-    pipelineManager.triggerPipelineCreation();
+
+    // trigger BackgroundPipelineCreator to take effect.
+    serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
+
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(
             pipelineManager, containerManager, scmContext));
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d926a02..6e2e3ce 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.primitives.Longs;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -58,6 +59,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -88,13 +90,17 @@ public class TestReplicationManager {
   private DatanodeCommandHandler datanodeCommandHandler;
   private SimpleMockNodeManager nodeManager;
   private ContainerManagerV2 containerManager;
-  private ConfigurationSource conf;
+  private OzoneConfiguration conf;
   private SCMNodeManager scmNodeManager;
 
   @Before
   public void setup()
       throws IOException, InterruptedException, NodeNotFoundException {
     conf = new OzoneConfiguration();
+    conf.setTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        0, TimeUnit.SECONDS);
+
     containerManager = Mockito.mock(ContainerManagerV2.class);
     nodeManager = new SimpleMockNodeManager();
     eventQueue = new EventQueue();
@@ -147,30 +153,43 @@ public class TestReplicationManager {
         Mockito.any(DatanodeDetails.class)))
         .thenReturn(NodeStatus.inServiceHealthy());
 
+    SCMServiceManager serviceManager = new SCMServiceManager();
+
     replicationManager = new ReplicationManager(
-        new ReplicationManagerConfiguration(),
+        conf,
         containerManager,
         containerPlacementPolicy,
         eventQueue,
         SCMContext.emptyContext(),
+        serviceManager,
         new LockManager<>(conf),
         nodeManager);
-    replicationManager.start();
+
+    serviceManager.notifyStatusChanged();
     Thread.sleep(100L);
   }
 
   private void createReplicationManager(ReplicationManagerConfiguration rmConf)
       throws InterruptedException {
+    OzoneConfiguration config = new OzoneConfiguration();
+    config.setTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        0, TimeUnit.SECONDS);
+    config.setFromObject(rmConf);
+
+    SCMServiceManager serviceManager = new SCMServiceManager();
+
     replicationManager = new ReplicationManager(
-        rmConf,
+        config,
         containerManager,
         containerPlacementPolicy,
         eventQueue,
         SCMContext.emptyContext(),
-        new LockManager<ContainerID>(conf),
+        serviceManager,
+        new LockManager<ContainerID>(config),
         nodeManager);
 
-    replicationManager.start();
+    serviceManager.notifyStatusChanged();
     Thread.sleep(100L);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index ba0cba5..8f9bc5d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -44,6 +44,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -100,8 +101,8 @@ public class TestSCMContainerManager {
         nodeManager,
         scmMetadataStore.getPipelineTable(),
         new EventQueue(),
-        SCMContext.emptyContext());
-    pipelineManager.allowPipelineCreation();
+        SCMContext.emptyContext(),
+        new SCMServiceManager());
     containerManager = new SCMContainerManager(conf,
         scmMetadataStore.getContainerTable(),
         scmMetadataStore.getStore(),
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
index a8e4c00..c809880 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
@@ -34,33 +34,43 @@ public class TestSCMContext {
   @Test
   public void testRaftOperations() {
     // start as follower
-    SCMContext scmContext = new SCMContext(false, 0, null, null);
+    SCMContext scmContext =
+        new SCMContext.Builder().setLeader(false).setTerm(0).build();
+
     assertFalse(scmContext.isLeader());
 
     // become leader
-    scmContext.updateIsLeaderAndTerm(true, 10);
+    scmContext.updateLeaderAndTerm(true, 10);
     assertTrue(scmContext.isLeader());
     try {
-      assertEquals(scmContext.getTerm(), 10);
+      assertEquals(scmContext.getTermOfLeader(), 10);
     } catch (NotLeaderException e) {
       fail("Should not throw nle.");
     }
 
     // step down
-    scmContext.updateIsLeaderAndTerm(false, 0);
+    scmContext.updateLeaderAndTerm(false, 0);
     assertFalse(scmContext.isLeader());
   }
 
   @Test
   public void testSafeModeOperations() {
     // in safe mode
-    SCMContext scmContext = new SCMContext(
-        true, 0, new SafeModeStatus(true, false), null);
+    SCMContext scmContext = new SCMContext.Builder()
+        .setIsInSafeMode(true)
+        .setIsPreCheckComplete(false)
+        .build();
+
     assertTrue(scmContext.isInSafeMode());
     assertFalse(scmContext.isPreCheckComplete());
 
+    // in safe mode, pass preCheck
+    scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+    assertTrue(scmContext.isInSafeMode());
+    assertTrue(scmContext.isPreCheckComplete());
+
     // out of safe mode
-    scmContext.onMessage(new SafeModeStatus(false, true), null);
+    scmContext.updateSafeModeStatus(new SafeModeStatus(false, true));
     assertFalse(scmContext.isInSafeMode());
     assertTrue(scmContext.isPreCheckComplete());
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
new file mode 100644
index 0000000..47b4537
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSCMServiceManager {
+  @Test
+  public void testServiceRunWhenLeader() {
+    SCMContext scmContext = new SCMContext.Builder()
+        .setLeader(false)
+        .setTerm(1)
+        .setIsInSafeMode(true)
+        .setIsPreCheckComplete(false)
+        .build();
+
+    // A service runs when it is leader.
+    SCMService serviceRunWhenLeader = new SCMService() {
+      private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+      @Override
+      public void notifyStatusChanged() {
+        if (scmContext.isLeader()) {
+          serviceStatus = ServiceStatus.RUNNING;
+        } else {
+          serviceStatus = ServiceStatus.PAUSING;
+        }
+      }
+
+      @Override
+      public boolean shouldRun() {
+        return serviceStatus == ServiceStatus.RUNNING;
+      }
+
+      @Override
+      public String getServiceName() {
+        return "serviceRunWhenLeader";
+      }
+    };
+
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    serviceManager.register(serviceRunWhenLeader);
+
+    // PAUSING at the beginning.
+    assertFalse(serviceRunWhenLeader.shouldRun());
+
+    // PAUSING when out of safe mode.
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(false, true));
+    serviceManager.notifyStatusChanged();
+    assertFalse(serviceRunWhenLeader.shouldRun());
+
+    // RUNNING when becoming leader.
+    scmContext.updateLeaderAndTerm(true, 2);
+    serviceManager.notifyStatusChanged();
+    assertTrue(serviceRunWhenLeader.shouldRun());
+
+    // RUNNING when in safe mode.
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, false));
+    serviceManager.notifyStatusChanged();
+    assertTrue(serviceRunWhenLeader.shouldRun());
+
+    // PAUSING when stepping down.
+    scmContext.updateLeaderAndTerm(false, 3);
+    serviceManager.notifyStatusChanged();
+    assertFalse(serviceRunWhenLeader.shouldRun());
+  }
+
+  @Test
+  public void setServiceRunWhenLeaderAndOutOfSafeMode() {
+    SCMContext scmContext = new SCMContext.Builder()
+        .setLeader(false)
+        .setTerm(1)
+        .setIsInSafeMode(true)
+        .setIsPreCheckComplete(false)
+        .build();
+
+    // A service runs when it is leader and out of safe mode.
+    SCMService serviceRunWhenLeaderAndOutOfSafeMode = new SCMService() {
+      private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+      @Override
+      public void notifyStatusChanged() {
+        if (scmContext.isLeader() && !scmContext.isInSafeMode()) {
+          serviceStatus = ServiceStatus.RUNNING;
+        } else {
+          serviceStatus = ServiceStatus.PAUSING;
+        }
+      }
+
+      @Override
+      public boolean shouldRun() {
+        return serviceStatus == ServiceStatus.RUNNING;
+      }
+
+      @Override
+      public String getServiceName() {
+        return "serviceRunWhenLeaderAndOutOfSafeMode";
+      }
+    };
+
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    serviceManager.register(serviceRunWhenLeaderAndOutOfSafeMode);
+
+    // PAUSING at the beginning.
+    assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+    // PAUSING when out of safe mode.
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(false, true));
+    serviceManager.notifyStatusChanged();
+    assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+    // RUNNING when becoming leader.
+    scmContext.updateLeaderAndTerm(true, 2);
+    serviceManager.notifyStatusChanged();
+    assertTrue(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+    // PAUSING when in safe mode.
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, false));
+    serviceManager.notifyStatusChanged();
+    assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+    // PAUSING when stepping down.
+    scmContext.updateLeaderAndTerm(false, 3);
+    serviceManager.notifyStatusChanged();
+    assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 5bcdf4b..eb76e9f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPla
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -123,7 +124,8 @@ public class TestContainerPlacement {
             scmNodeManager,
             scmMetadataStore.getPipelineTable(),
             eventQueue,
-            SCMContext.emptyContext());
+            SCMContext.emptyContext(),
+            new SCMServiceManager());
 
     return new SCMContainerManager(config, 
scmMetadataStore.getContainerTable(),
         scmMetadataStore.getStore(),
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 78e7718..23b7950 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -162,7 +162,6 @@ public class TestDeadNodeHandler {
 
     LambdaTestUtils.await(120000, 1000,
         () -> {
-          pipelineManager.triggerPipelineCreation();
           System.out.println(pipelineManager.getPipelines(RATIS, 
THREE).size());
           System.out.println(pipelineManager.getPipelines(RATIS, ONE).size());
           return pipelineManager.getPipelines(RATIS, THREE).size() > 3;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 947cd37..1759245 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -228,10 +226,4 @@ public final class MockPipelineManager implements 
PipelineManager {
   public Map<String, Integer> getPipelineInfo() {
     return null;
   }
-
-  @Override
-  public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus,
-                        final EventPublisher publisher) {
-
-  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 654fd9c..862e19b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
@@ -71,6 +72,8 @@ public class TestPipelineManagerImpl {
   private DBStore dbStore;
   private static MockNodeManager nodeManager;
   private static int maxPipelineCount;
+  private SCMContext scmContext;
+  private SCMServiceManager serviceManager;
 
   @Before
   public void init() throws Exception {
@@ -86,6 +89,8 @@ public class TestPipelineManagerImpl {
         conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT,
             OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) /
         HddsProtos.ReplicationFactor.THREE.getNumber();
+    scmContext = SCMContext.emptyContext();
+    serviceManager = new SCMServiceManager();
   }
 
   @After
@@ -103,7 +108,8 @@ public class TestPipelineManagerImpl {
         new MockNodeManager(true, 20),
         SCMDBDefinition.PIPELINES.getTable(dbStore),
         new EventQueue(),
-        SCMContext.emptyContext());
+        scmContext,
+        serviceManager);
   }
 
   private PipelineManagerV2Impl createPipelineManager(
@@ -113,7 +119,8 @@ public class TestPipelineManagerImpl {
         new MockNodeManager(true, 20),
         SCMDBDefinition.PIPELINES.getTable(dbStore),
         new EventQueue(),
-        SCMContext.emptyContext());
+        SCMContext.emptyContext(),
+        serviceManager);
   }
 
   @Test
@@ -122,7 +129,6 @@ public class TestPipelineManagerImpl {
     PipelineManagerV2Impl pipelineManager =
         createPipelineManager(true, buffer1);
     Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline1 = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -141,7 +147,6 @@ public class TestPipelineManagerImpl {
     // Should be able to load previous pipelines.
     Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
     Assert.assertEquals(2, pipelineManager.getPipelines().size());
-    pipelineManager2.allowPipelineCreation();
     Pipeline pipeline3 = pipelineManager2.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     buffer2.close();
@@ -155,7 +160,6 @@ public class TestPipelineManagerImpl {
   public void testCreatePipelineShouldFailOnFollower() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
     Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
-    pipelineManager.allowPipelineCreation();
     try {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
@@ -174,7 +178,6 @@ public class TestPipelineManagerImpl {
         createPipelineManager(true, buffer);
     Table<PipelineID, Pipeline> pipelineStore =
         SCMDBDefinition.PIPELINES.getTable(dbStore);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -218,7 +221,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testOpenPipelineShouldFailOnFollower() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -240,7 +242,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testActivatePipelineShouldFailOnFollower() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -262,7 +263,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -284,7 +284,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testRemovePipeline() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     // Create a pipeline
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
@@ -327,7 +326,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testClosePipelineShouldFailOnFollower() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -349,10 +347,9 @@ public class TestPipelineManagerImpl {
   @Test
   public void testPipelineReport() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
-            new EventQueue());
+            new EventQueue(), serviceManager, scmContext);
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
@@ -400,7 +397,6 @@ public class TestPipelineManagerImpl {
   @Test
   public void testPipelineCreationFailedMetric() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
 
     // No pipeline at start
     MetricsRecordBuilder metrics = getMetrics(
@@ -457,10 +453,7 @@ public class TestPipelineManagerImpl {
     DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
     PipelineManagerV2Impl pipelineManager =
         createPipelineManager(true, buffer1);
-    pipelineManager.allowPipelineCreation();
 
-    pipelineManager.onMessage(
-        new SCMSafeModeManager.SafeModeStatus(true, true), null);
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
@@ -473,8 +466,8 @@ public class TestPipelineManagerImpl {
         pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
 
     SCMSafeModeManager scmSafeModeManager =
-        new SCMSafeModeManager(new OzoneConfiguration(),
-            new ArrayList<>(), pipelineManager, new EventQueue());
+        new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(),
+            pipelineManager, new EventQueue(), serviceManager, scmContext);
     PipelineReportHandler pipelineReportHandler =
         new PipelineReportHandler(scmSafeModeManager, pipelineManager,
             SCMContext.emptyContext(), conf);
@@ -508,7 +501,6 @@ public class TestPipelineManagerImpl {
         TimeUnit.MILLISECONDS);
 
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
@@ -541,7 +533,6 @@ public class TestPipelineManagerImpl {
         TimeUnit.MILLISECONDS);
 
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
-    pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
@@ -577,6 +568,9 @@ public class TestPipelineManagerImpl {
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
         TimeUnit.MILLISECONDS);
 
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, false));
+
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     try {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
@@ -597,8 +591,8 @@ public class TestPipelineManagerImpl {
             HddsProtos.ReplicationFactor.ONE).contains(pipeline));
 
     // Simulate safemode check exiting.
-    pipelineManager.onMessage(
-        new SCMSafeModeManager.SafeModeStatus(true, true), null);
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, true));
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
@@ -616,17 +610,22 @@ public class TestPipelineManagerImpl {
         TimeUnit.MILLISECONDS);
 
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, false));
     Assert.assertTrue(pipelineManager.getSafeModeStatus());
     Assert.assertFalse(pipelineManager.isPipelineCreationAllowed());
+
     // First pass pre-check as true, but safemode still on
-    pipelineManager.onMessage(
-        new SCMSafeModeManager.SafeModeStatus(true, true), null);
+    // Simulate safemode check exiting.
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(true, true));
     Assert.assertTrue(pipelineManager.getSafeModeStatus());
     Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
 
     // Then also turn safemode off
-    pipelineManager.onMessage(
-        new SCMSafeModeManager.SafeModeStatus(false, true), null);
+    scmContext.updateSafeModeStatus(
+        new SCMSafeModeManager.SafeModeStatus(false, true));
     Assert.assertFalse(pipelineManager.getSafeModeStatus());
     Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
     pipelineManager.close();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2226a43..24cb4b5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
@@ -221,7 +222,8 @@ public class TestSCMPipelineManager {
 
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
-            eventQueue);
+            eventQueue, new SCMServiceManager(),
+            SCMContext.emptyContext());
 
     // create a pipeline in allocated state with no dns yet reported
     Pipeline pipeline = pipelineManager
@@ -494,8 +496,10 @@ public class TestSCMPipelineManager {
         pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
 
     SCMSafeModeManager scmSafeModeManager =
-        new SCMSafeModeManager(new OzoneConfiguration(),
-            new ArrayList<>(), pipelineManager, eventQueue);
+        new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(),
+            pipelineManager, eventQueue,
+            new SCMServiceManager(),
+            SCMContext.emptyContext());
     PipelineReportHandler pipelineReportHandler =
         new PipelineReportHandler(scmSafeModeManager, pipelineManager,
             SCMContext.emptyContext(), conf);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 19f1f30..e7dbeeb 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -55,6 +56,8 @@ public class TestHealthyPipelineSafeModeRule {
   public void testHealthyPipelineSafeModeRuleWithNoPipelines()
       throws Exception {
     EventQueue eventQueue = new EventQueue();
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    SCMContext scmContext = SCMContext.emptyContext();
     List<ContainerInfo> containers =
             new ArrayList<>(HddsTestUtils.getContainerInfo(1));
 
@@ -79,14 +82,16 @@ public class TestHealthyPipelineSafeModeRule {
               nodeManager,
               scmMetadataStore.getPipelineTable(),
               eventQueue,
-              SCMContext.emptyContext());
+              scmContext,
+              serviceManager);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
       pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
           mockRatisProvider);
       SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
-          config, containers, pipelineManager, eventQueue);
+          config, containers, pipelineManager, eventQueue,
+          serviceManager, scmContext);
 
       HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
@@ -105,6 +110,8 @@ public class TestHealthyPipelineSafeModeRule {
         TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
 
     EventQueue eventQueue = new EventQueue();
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    SCMContext scmContext = SCMContext.emptyContext();
     List<ContainerInfo> containers =
             new ArrayList<>(HddsTestUtils.getContainerInfo(1));
 
@@ -129,8 +136,8 @@ public class TestHealthyPipelineSafeModeRule {
               nodeManager,
               scmMetadataStore.getPipelineTable(),
               eventQueue,
-              SCMContext.emptyContext());
-      pipelineManager.allowPipelineCreation();
+              scmContext,
+              serviceManager);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -163,7 +170,8 @@ public class TestHealthyPipelineSafeModeRule {
       MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
 
       SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
-          config, containers, pipelineManager, eventQueue);
+          config, containers, pipelineManager, eventQueue,
+          serviceManager, scmContext);
 
       HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
@@ -199,6 +207,8 @@ public class TestHealthyPipelineSafeModeRule {
         TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
 
     EventQueue eventQueue = new EventQueue();
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    SCMContext scmContext = SCMContext.emptyContext();
     List<ContainerInfo> containers =
             new ArrayList<>(HddsTestUtils.getContainerInfo(1));
 
@@ -224,9 +234,9 @@ public class TestHealthyPipelineSafeModeRule {
               nodeManager,
               scmMetadataStore.getPipelineTable(),
               eventQueue,
-              SCMContext.emptyContext());
+              scmContext,
+              serviceManager);
 
-      pipelineManager.allowPipelineCreation();
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -258,7 +268,8 @@ public class TestHealthyPipelineSafeModeRule {
       MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
 
       SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
-          config, containers, pipelineManager, eventQueue);
+          config, containers, pipelineManager, eventQueue,
+          serviceManager, scmContext);
 
       HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index b915899..6de81dc 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -62,6 +63,8 @@ public class TestOneReplicaPipelineSafeModeRule {
   private OneReplicaPipelineSafeModeRule rule;
   private PipelineManagerV2Impl pipelineManager;
   private EventQueue eventQueue;
+  private SCMServiceManager serviceManager;
+  private SCMContext scmContext;
   private MockNodeManager mockNodeManager;
 
   private void setup(int nodes, int pipelineFactorThreeCount,
@@ -79,6 +82,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     mockNodeManager = new MockNodeManager(true, nodes);
 
     eventQueue = new EventQueue();
+    serviceManager = new SCMServiceManager();
+    scmContext = SCMContext.emptyContext();
 
     SCMMetadataStore scmMetadataStore =
             new SCMMetadataStoreImpl(ozoneConfiguration);
@@ -89,8 +94,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         mockNodeManager,
         scmMetadataStore.getPipelineTable(),
         eventQueue,
-        SCMContext.emptyContext());
-    pipelineManager.allowPipelineCreation();
+        scmContext,
+        serviceManager);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
@@ -105,7 +110,7 @@ public class TestOneReplicaPipelineSafeModeRule {
 
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(ozoneConfiguration, containers,
-            pipelineManager, eventQueue);
+            pipelineManager, eventQueue, serviceManager, scmContext);
 
     rule = scmSafeModeManager.getOneReplicaPipelineSafeModeRule();
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index e8dbc2e..523f964 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,6 +39,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -47,7 +47,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -73,6 +72,7 @@ public class TestSCMSafeModeManager {
 
   private static EventQueue queue;
   private SCMContext scmContext;
+  private SCMServiceManager serviceManager;
   private SCMSafeModeManager scmSafeModeManager;
   private static OzoneConfiguration config;
   private List<ContainerInfo> containers = Collections.emptyList();
@@ -88,7 +88,8 @@ public class TestSCMSafeModeManager {
   @Before
   public void setUp() {
     queue = new EventQueue();
-    scmContext = SCMContext.emptyContext();
+    scmContext = new SCMContext.Builder().build();
+    serviceManager = new SCMServiceManager();
     config = new OzoneConfiguration();
     config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
         false);
@@ -120,7 +121,7 @@ public class TestSCMSafeModeManager {
   @Test
   public void testSafeModeStateWithNullContainers() {
     new SCMSafeModeManager(config, Collections.emptyList(),
-        null, queue);
+        null, queue, serviceManager, scmContext);
   }
 
   private void testSafeMode(int numContainers) throws Exception {
@@ -132,7 +133,7 @@ public class TestSCMSafeModeManager {
       container.setState(HddsProtos.LifeCycleState.CLOSED);
     }
     scmSafeModeManager = new SCMSafeModeManager(
-        config, containers, null, queue);
+        config, containers, null, queue, serviceManager, scmContext);
 
     assertTrue(scmSafeModeManager.getInSafeMode());
     queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
@@ -155,59 +156,6 @@ public class TestSCMSafeModeManager {
   }
 
   @Test
-  public void testDelayedEventNotification() throws Exception {
-
-    List<SafeModeStatus> delayedSafeModeEvents = new ArrayList<>();
-    List<SafeModeStatus> safeModeEvents = new ArrayList<>();
-
-    //given
-    EventQueue eventQueue = new EventQueue();
-    eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
-        (safeModeStatus, publisher) -> safeModeEvents.add(safeModeStatus));
-    eventQueue.addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS,
-        (safeModeStatus, publisher) -> delayedSafeModeEvents
-            .add(safeModeStatus));
-
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration
-        
.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-            3, TimeUnit.SECONDS);
-    ozoneConfiguration
-        .setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
-
-    scmSafeModeManager = new SCMSafeModeManager(
-        ozoneConfiguration, containers, null, eventQueue);
-
-    //when
-    scmSafeModeManager.setInSafeMode(true);
-    scmSafeModeManager.setPreCheckComplete(true);
-
-    scmSafeModeManager.emitSafeModeStatus();
-    eventQueue.processAll(1000L);
-
-    //then
-    Assert.assertEquals(1, delayedSafeModeEvents.size());
-    Assert.assertEquals(1, safeModeEvents.size());
-
-    //when
-    scmSafeModeManager.setInSafeMode(false);
-    scmSafeModeManager.setPreCheckComplete(true);
-
-    scmSafeModeManager.emitSafeModeStatus();
-    eventQueue.processAll(1000L);
-
-    //then
-    Assert.assertEquals(2, safeModeEvents.size());
-    //delayed messages are not yet sent (unless JVM is paused for 3 seconds)
-    Assert.assertEquals(1, delayedSafeModeEvents.size());
-
-    //event will be triggered after 3 seconds (see previous config)
-    GenericTestUtils.waitFor(() -> delayedSafeModeEvents.size() == 2,
-        300,
-        6000);
-
-  }
-  @Test
   public void testSafeModeExitRule() throws Exception {
     containers = new ArrayList<>();
     int numContainers = 100;
@@ -218,7 +166,7 @@ public class TestSCMSafeModeManager {
       container.setState(HddsProtos.LifeCycleState.CLOSED);
     }
     scmSafeModeManager = new SCMSafeModeManager(
-        config, containers, null, queue);
+        config, containers, null, queue, serviceManager, scmContext);
 
     long cutOff = (long) Math.ceil(numContainers * config.getDouble(
         HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT,
@@ -311,9 +259,10 @@ public class TestSCMSafeModeManager {
               mockNodeManager,
               scmMetadataStore.getPipelineTable(),
               queue,
-              scmContext);
+              scmContext,
+              serviceManager);
       scmSafeModeManager = new SCMSafeModeManager(
-          conf, containers, pipelineManager, queue);
+          conf, containers, pipelineManager, queue, serviceManager, 
scmContext);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
     } catch (IllegalArgumentException ex) {
       GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" 
+
@@ -335,9 +284,10 @@ public class TestSCMSafeModeManager {
               mockNodeManager,
               scmMetadataStore.getPipelineTable(),
               queue,
-              scmContext);
+              scmContext,
+              serviceManager);
       scmSafeModeManager = new SCMSafeModeManager(
-          conf, containers, pipelineManager, queue);
+          conf, containers, pipelineManager, queue, serviceManager, 
scmContext);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
     } catch (IllegalArgumentException ex) {
       GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" 
+
@@ -358,9 +308,10 @@ public class TestSCMSafeModeManager {
               mockNodeManager,
               scmMetadataStore.getPipelineTable(),
               queue,
-              scmContext);
+              scmContext,
+              serviceManager);
       scmSafeModeManager = new SCMSafeModeManager(
-          conf, containers, pipelineManager, queue);
+          conf, containers, pipelineManager, queue, serviceManager, 
scmContext);
       fail("testFailWithIncorrectValueForSafeModePercent");
     } catch (IllegalArgumentException ex) {
       GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" 
+
@@ -388,13 +339,14 @@ public class TestSCMSafeModeManager {
             mockNodeManager,
             scmMetadataStore.getPipelineTable(),
             queue,
-            scmContext);
+            scmContext,
+            serviceManager);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
-    pipelineManager.allowPipelineCreation();
+    pipelineManager.getBackgroundPipelineCreator().stop();
 
     for (int i = 0; i < pipelineCount; i++) {
       // Create pipeline
@@ -412,8 +364,8 @@ public class TestSCMSafeModeManager {
       container.setState(HddsProtos.LifeCycleState.CLOSED);
     }
 
-    scmSafeModeManager = new SCMSafeModeManager(conf, containers,
-        pipelineManager, queue);
+    scmSafeModeManager = new SCMSafeModeManager(
+        conf, containers, pipelineManager, queue, serviceManager, scmContext);
 
     assertTrue(scmSafeModeManager.getInSafeMode());
     testContainerThreshold(containers, 1.0);
@@ -523,9 +475,8 @@ public class TestSCMSafeModeManager {
     OzoneConfiguration conf = new OzoneConfiguration(config);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false);
     PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-    Mockito.doNothing().when(pipelineManager).startPipelineCreator();
-    scmSafeModeManager =
-        new SCMSafeModeManager(conf, containers, pipelineManager, queue);
+    scmSafeModeManager = new SCMSafeModeManager(
+        conf, containers, pipelineManager, queue, serviceManager, scmContext);
     assertFalse(scmSafeModeManager.getInSafeMode());
   }
 
@@ -557,7 +508,7 @@ public class TestSCMSafeModeManager {
     }
 
     scmSafeModeManager = new SCMSafeModeManager(
-        config, containers, null, queue);
+        config, containers, null, queue, serviceManager, scmContext);
 
     assertTrue(scmSafeModeManager.getInSafeMode());
 
@@ -581,7 +532,7 @@ public class TestSCMSafeModeManager {
     OzoneConfiguration conf = new OzoneConfiguration(config);
     conf.setInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE, numOfDns);
     scmSafeModeManager = new SCMSafeModeManager(
-        conf, containers, null, queue);
+        conf, containers, null, queue, serviceManager, scmContext);
 
     // Assert SCM is in Safe mode.
     assertTrue(scmSafeModeManager.getInSafeMode());
@@ -639,14 +590,14 @@ public class TestSCMSafeModeManager {
               nodeManager,
               scmMetadataStore.getPipelineTable(),
               queue,
-              scmContext);
+              scmContext,
+              serviceManager);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
       pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
           mockRatisProvider);
-      pipelineManager.allowPipelineCreation();
 
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
@@ -656,7 +607,8 @@ public class TestSCMSafeModeManager {
       MockRatisPipelineProvider.markPipelineHealthy(pipeline);
 
       scmSafeModeManager = new SCMSafeModeManager(
-          config, containers, pipelineManager, queue);
+          config, containers, pipelineManager, queue, serviceManager,
+          scmContext);
 
       queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
@@ -703,7 +655,8 @@ public class TestSCMSafeModeManager {
             nodeManager,
             scmMetadataStore.getPipelineTable(),
             queue,
-            scmContext);
+            scmContext,
+            serviceManager);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -714,7 +667,7 @@ public class TestSCMSafeModeManager {
     SafeModeEventHandler smHandler = new SafeModeEventHandler();
     queue.addHandler(SCMEvents.SAFE_MODE_STATUS, smHandler);
     scmSafeModeManager = new SCMSafeModeManager(
-        config, containers, pipelineManager, queue);
+        config, containers, pipelineManager, queue, serviceManager, 
scmContext);
 
     // Assert SCM is in Safe mode.
     assertTrue(scmSafeModeManager.getInSafeMode());
@@ -739,9 +692,6 @@ public class TestSCMSafeModeManager {
     Assert.assertEquals(true, smHandler.getPreCheckComplete());
     Assert.assertEquals(true, smHandler.getIsInSafeMode());
 
-    // Create a pipeline and ensure safemode is exited.
-    pipelineManager.allowPipelineCreation();
-
     /* There is a race condition where the background pipeline creation
      * task creates the pipeline before the following create call.
      * So wrapping it with try..catch.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 26852f0..657f5db 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -160,7 +161,9 @@ public class TestRatisPipelineCreateAndDestroy {
         .getScmNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) >=
         HddsProtos.ReplicationFactor.THREE.getNumber()) {
       // make sure pipelines is created after node start
-      pipelineManager.triggerPipelineCreation();
+      cluster.getStorageContainerManager()
+          .getSCMServiceManager()
+          .notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
       waitForPipelines(1);
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 2a468f5..635fe30 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -73,6 +73,7 @@ import 
org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -647,10 +648,16 @@ public class TestStorageContainerManager {
           dnUuid, closeContainerCommand);
 
       GenericTestUtils.waitFor(() -> {
-        return replicationManager.isRunning();
+        SCMContext scmContext
+            = cluster.getStorageContainerManager().getScmContext();
+        return !scmContext.isInSafeMode() && scmContext.isLeader();
       }, 1000, 25000);
 
+      // After safe mode is off, ReplicationManager starts to run with a delay.
+      Thread.sleep(5000);
       // Give ReplicationManager some time to process the containers.
+      cluster.getStorageContainerManager()
+          .getReplicationManager().processContainersNow();
       Thread.sleep(5000);
 
       verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 5e08f2d..69f07df 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -141,7 +141,7 @@ public class TestContainerReplication {
     SCMCommand<?> command = new ReplicateContainerCommand(containerId,
         sourcePipelines.getNodes());
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
             command);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 00a338b..b45603a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -147,7 +147,7 @@ public class TestCloseContainerByPipeline {
     SCMCommand<?> command = new CloseContainerCommand(
         containerID, pipeline.getId());
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(), command);
     GenericTestUtils
@@ -198,7 +198,7 @@ public class TestCloseContainerByPipeline {
     SCMCommand<?> command = new CloseContainerCommand(
         containerID, pipeline.getId());
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(), command);
 
@@ -251,8 +251,8 @@ public class TestCloseContainerByPipeline {
       //send the order to close the container
       SCMCommand<?> command = new CloseContainerCommand(
           containerID, pipeline.getId());
-      command.setTerm(
-          cluster.getStorageContainerManager().getScmContext().getTerm());
+      command.setTerm(cluster.getStorageContainerManager()
+          .getScmContext().getTermOfLeader());
       cluster.getStorageContainerManager().getScmNodeManager()
           .addDatanodeCommand(details.getUuid(), command);
       int index = cluster.getHddsDatanodeIndex(details);
@@ -332,7 +332,7 @@ public class TestCloseContainerByPipeline {
     SCMCommand<?> command = new CloseContainerCommand(
         containerID, pipeline.getId(), true);
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(), command);
     GenericTestUtils
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 870486a..c165f92 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -123,7 +123,7 @@ public class TestCloseContainerHandler {
     SCMCommand<?> command = new CloseContainerCommand(
         containerId.getId(), pipeline.getId());
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(), command);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 40998e1..ab20505 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -136,7 +136,7 @@ public class TestDeleteContainerHandler {
     SCMCommand<?> command = new CloseContainerCommand(
         containerId.getId(), pipeline.getId());
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
 
     GenericTestUtils.waitFor(() ->
@@ -154,7 +154,7 @@ public class TestDeleteContainerHandler {
     // send delete container to the datanode
     command = new DeleteContainerCommand(containerId.getId(), false);
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
 
     GenericTestUtils.waitFor(() ->
@@ -192,7 +192,7 @@ public class TestDeleteContainerHandler {
     SCMCommand<?> command = new DeleteContainerCommand(
         containerId.getId(), false);
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
 
     // Here it should not delete it, and the container should exist in the
@@ -216,7 +216,7 @@ public class TestDeleteContainerHandler {
     // container
     command = new DeleteContainerCommand(containerId.getId(), true);
     command.setTerm(
-        cluster.getStorageContainerManager().getScmContext().getTerm());
+        
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
     nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
 
     GenericTestUtils.waitFor(() ->


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to