This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit caefc4f5bb10babcba429d77997733c85b249187
Author: Sammi Chen <sammic...@apache.org>
AuthorDate: Tue Oct 15 14:36:14 2019 +0800

    HDDS-2034. Async RATIS pipeline creation and destroy through datanode 
heartbeat commands.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   7 ++
 .../common/src/main/resources/ozone-default.xml    |  30 +++--
 .../common/report/PipelineReportPublisher.java     |   1 +
 .../common/statemachine/DatanodeStateMachine.java  |  14 +++
 .../CloseContainerCommandHandler.java              |  11 +-
 .../ClosePipelineCommandHandler.java               | 121 ++++++++++++++++++
 .../commandhandler/CommandHandler.java             |   2 +-
 .../CreatePipelineCommandHandler.java              | 135 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  22 ++++
 .../common/transport/server/XceiverServerSpi.java  |  18 +++
 .../transport/server/ratis/XceiverServerRatis.java |  36 ++++++
 .../protocol/commands/ClosePipelineCommand.java    |  73 +++++++++++
 .../protocol/commands/CreatePipelineCommand.java   | 100 +++++++++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  23 ++++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   3 +
 .../hdds/scm/container/ContainerStateManager.java  |  17 +--
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  12 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  13 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  15 ++-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  34 +++---
 .../hdds/scm/pipeline/PipelineStateManager.java    |   2 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 132 +++++++-------------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 103 ----------------
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 102 +++++++++++++---
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |  10 ++
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  96 ++++++---------
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  86 ++++++-------
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  30 ++++-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   1 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  20 +++
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  13 ++
 .../container/TestCloseContainerEventHandler.java  |  14 ++-
 .../scm/container/TestSCMContainerManager.java     |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   2 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  11 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   9 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   3 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  45 +++----
 .../TestOneReplicaPipelineSafeModeRule.java        |  35 +-----
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  48 +++-----
 .../TestContainerStateManagerIntegration.java      |   4 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  15 ++-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  37 +++---
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  12 +-
 50 files changed, 1039 insertions(+), 513 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
   public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
       "hdds.scm.safemode.pipeline-availability.check";
   public static final boolean
-      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+  public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+      "hdds.scm.safemode.pipeline.creation";
+  public static final boolean
+      HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
 
   // % of containers which should have at least one reported replica
   // before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
       "hdds.scm.safemode.threshold.pct";
   public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
-
   // percentage of healthy pipelines, where all 3 datanodes are reported in the
   // pipeline.
   public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
       "hdds.scm.safemode.healthy.pipelie.pct";
   public static final double
       HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+  // number of healthy RATIS pipeline(ONE or THREE factor)
+  public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+      "hdds.scm.safemode.min.pipeline";
+  public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
 
   public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
       "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 2828f6e..f2ca2fa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -50,6 +50,7 @@ public final class Pipeline {
   private Map<DatanodeDetails, Long> nodeStatus;
   // nodes with ordered distance to client
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new 
ThreadLocal<>();
+  private final long creationTime;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -64,6 +65,7 @@ public final class Pipeline {
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
+    this.creationTime = System.currentTimeMillis();
   }
 
   /**
@@ -134,6 +136,11 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public boolean isAllocationTimeout() {
+    //TODO: define a system property to control the timeout value
+    return false;
+  }
+
   public void setNodesInOrder(List<DatanodeDetails> nodes) {
     nodesInOrder.set(nodes);
   }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b0a59fa..eff4f35 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -319,15 +319,6 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
-    <name>hdds.command.status.report.interval</name>
-    <value>60000ms</value>
-    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
-    <description>Time interval of the datanode to send status of command
-      execution. Each datanode periodically the execution status of commands
-      received from SCM to SCM. Unit could be defined with postfix
-      (ns,ms,s,m,h,d)</description>
-  </property>
-  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1314,7 +1305,7 @@
 
   <property>
     <name>hdds.scm.safemode.pipeline-availability.check</name>
-    <value>false</value>
+    <value>true</value>
     <tag>HDDS,SCM,OPERATION</tag>
     <description>
       Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1400,6 +1391,25 @@
   </property>
 
   <property>
+    <name>hdds.scm.safemode.pipeline.creation</name>
+    <value>true</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Boolean value to enable background pipeline creation in SCM safe mode.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.scm.safemode.min.pipeline</name>
+    <value>1</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+      "hdds.scm.safemode.pipeline.creation" is True.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.lock.max.concurrency</name>
     <value>100</value>
     <tag>HDDS</tag>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
index e7f4347..e1dd098 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -68,6 +68,7 @@ public class PipelineReportPublisher extends
 
   @Override
   protected PipelineReportsProto getReport() {
+    System.out.println("Pipeline Report Generate");
     return getContext().getParent().getContainer().getPipelineReport();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c9eb702..926f19c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import 
org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
@@ -126,6 +130,8 @@ public class DatanodeStateMachine implements Closeable {
             conf))
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .addHandler(new DeleteContainerCommandHandler())
+        .addHandler(new ClosePipelineCommandHandler())
+        .addHandler(new CreatePipelineCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
@@ -486,4 +492,12 @@ public class DatanodeStateMachine implements Closeable {
   public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public CertificateClient getCertificateClient() {
+    return dnCertClient;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 2dec08f..26031d5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private int invocationCount;
+  private AtomicLong invocationCount = new AtomicLong(0);
   private long totalTime;
 
   /**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
-    invocationCount++;
+    invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails datanodeDetails = context.getParent()
         .getDatanodeDetails();
@@ -161,7 +162,7 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
    */
   @Override
   public int getInvocationCount() {
-    return invocationCount;
+    return (int)invocationCount.get();
   }
 
   /**
@@ -171,8 +172,8 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
    */
   @Override
   public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
     }
     return 0;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..a31387a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+    final ClosePipelineCommandProto closeCommand =
+        ((ClosePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.removeGroup(pipelineID);
+      context.getParent().triggerHeartbeat();
+      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+          dn.getUuidString());
+    } catch (IOException e) {
+      LOG.error("Can't close pipeline #{}", pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 1ea0ea8..dca02f6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
   default void updateCommandStatus(StateContext context, SCMCommand command,
       Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
     if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
+      log.warn("{} with Id:{} not found.", command.getType(),
           command.getId());
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..3a60d7e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a createPipelineCommand handler.
+   */
+  public CreatePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent()
+        .getDatanodeDetails();
+    final CreatePipelineCommandProto createCommand =
+        ((CreatePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
+    Collection<DatanodeDetails> peers =
+        createCommand.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.addGroup(pipelineID, peers);
+      LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+          createCommand.getType(), createCommand.getFactor(), pipelineID,
+          dn.getUuidString());
+      // Trigger heartbeat report
+      
context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.getParent().triggerHeartbeat();
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+    } catch (IOException e) {
+      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+          createCommand.getFactor(), pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(deleteContainerCommand);
         break;
+      case createPipelineCommand:
+        CreatePipelineCommand createPipelineCommand =
+            CreatePipelineCommand.getFromProtobuf(
+                commandResponseProto.getCreatePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM create pipeline request {}",
+              createPipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(createPipelineCommand);
+        break;
+      case closePipelineCommand:
+        ClosePipelineCommand closePipelineCommand =
+            ClosePipelineCommand.getFromProtobuf(
+                commandResponseProto.getClosePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM close pipeline request {}",
+              closePipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(closePipelineCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 4e0d343..01f463c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -60,6 +62,22 @@ public interface XceiverServerSpi {
    */
   boolean isExist(HddsProtos.PipelineID pipelineId);
 
+
+  /**
+   * Join a new pipeline.
+   */
+  default void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+  }
+
+
+  /**
+   * Exit a pipeline.
+   */
+  default void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+  }
+
   /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 80e91cd..9733b8e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -617,6 +618,41 @@ public final class XceiverServerRatis extends 
XceiverServer {
     return pipelineIDs;
   }
 
+  @Override
+  public void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
+    final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+    GroupManagementRequest request = GroupManagementRequest.newAdd(
+        clientId, server.getId(), nextCallId(), group);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
+  @Override
+  public void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+    GroupManagementRequest request = GroupManagementRequest.newRemove(
+        clientId, server.getId(), nextCallId(),
+        RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
+        true);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
   void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(groupId, roleInfoProto);
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+    extends SCMCommand<ClosePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+
+  public ClosePipelineCommand(final PipelineID pipelineID) {
+    super();
+    this.pipelineID = pipelineID;
+  }
+
+  public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  @Override
+  public ClosePipelineCommandProto getProto() {
+    ClosePipelineCommandProto.Builder builder =
+        ClosePipelineCommandProto.newBuilder();
+    builder.setCmdId(getId());
+    builder.setPipelineID(pipelineID.getProtobuf());
+    return builder.build();
+  }
+
+  public static ClosePipelineCommand getFromProtobuf(
+      ClosePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+    extends SCMCommand<CreatePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+  private final ReplicationFactor factor;
+  private final ReplicationType type;
+  private final List<DatanodeDetails> nodelist;
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  @Override
+  public CreatePipelineCommandProto getProto() {
+    return CreatePipelineCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setPipelineID(pipelineID.getProtobuf())
+        .setFactor(factor)
+        .setType(type)
+        .addAllDatanode(nodelist.stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static CreatePipelineCommand getFromProtobuf(
+      CreatePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+        createPipelineProto.getType(), createPipelineProto.getFactor(),
+        createPipelineProto.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index a975cd5..c8ef6c1 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -282,6 +282,8 @@ message SCMCommandProto {
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
     replicateContainerCommand = 5;
+    createPipelineCommand = 6;
+    closePipelineCommand = 7;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -290,6 +292,8 @@ message SCMCommandProto {
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+  optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+  optional ClosePipelineCommandProto closePipelineCommandProto = 8;
 }
 
 /**
@@ -359,6 +363,25 @@ message ReplicateContainerCommandProto {
 }
 
 /**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required ReplicationType type = 2;
+  required ReplicationFactor factor = 3;
+  repeated DatanodeDetailsProto datanode = 4;
+  required int64 cmdId = 5;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required int64 cmdId = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 4c182c3..9e815c8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import javax.management.ObjectName;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -190,6 +191,8 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+          // wait until pipeline is ready
+          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " 
+
                   "get pipelines call once.", type, factor, e);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7dde8d7..78a944f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -249,18 +249,19 @@ public class ContainerStateManager {
       throws IOException {
 
     Pipeline pipeline;
-    try {
-      // TODO: #CLUTIL remove creation logic when all replication types and
-      // factors are handled by pipeline creator job.
-      pipeline = pipelineManager.createPipeline(type, replicationFactor);
-    } catch (IOException e) {
-      final List<Pipeline> pipelines = pipelineManager
-          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
-      if (pipelines.isEmpty()) {
+    final List<Pipeline> pipelines = pipelineManager
+        .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+    if (pipelines.isEmpty()) {
+      try {
+        pipeline = pipelineManager.createPipeline(type, replicationFactor);
+        pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+      } catch (IOException e) {
+        LOG.error("Fail to create pipeline for " + e.getMessage());
         throw new IOException("Could not allocate container. Cannot get any" +
             " matching pipeline for Type:" + type +
             ", Factor:" + replicationFactor + ", State:PipelineState.OPEN");
       }
+    } else {
       pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
     }
     synchronized (pipeline) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..6de05fd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,15 +98,14 @@ public final class SCMEvents {
           new TypedEvent<>(PipelineReportFromDatanode.class, 
"Pipeline_Report");
 
   /**
-   * PipelineReport processed by pipeline report handler. This event is
+   * Open pipeline event sent by PipelineReportHandler. This event is
    * received by HealthyPipelineSafeModeRule.
    */
-  public static final TypedEvent<PipelineReportFromDatanode>
-      PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
-          PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+  public static final TypedEvent<Pipeline>
+      OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
 
   /**
-   * PipelineActions are sent by Datanode. This event is received by
+   * PipelineActions are sent by Datanode to close a pipeline. It's received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */
   public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +113,7 @@ public final class SCMEvents {
       "Pipeline_Actions");
 
   /**
-   * A Command status report will be sent by datanodes. This repoort is 
received
+   * A Command status report will be sent by datanodes. This report is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
    */
   public static final TypedEvent<CommandStatusReportFromDatanode>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf, GrpcTlsConfig tlsConfig) {
+      Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+        new RatisPipelineProvider(nodeManager, stateManager, conf,
+            eventPublisher));
   }
 
   @VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
     return providers.get(type).create(factor, nodes);
   }
 
+  public void close(ReplicationType type, Pipeline pipeline)
+      throws IOException {
+    providers.get(type).close(pipeline);
+  }
+
   public void shutdown() {
     providers.values().forEach(provider -> provider.shutdown());
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..779008f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -51,6 +50,9 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
   List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -95,5 +97,14 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
 
-  GrpcTlsConfig getGrpcTlsConfig();
+  /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout(millisecond), if 0, use default timeout
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  default void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
+  void close(Pipeline pipeline) throws IOException;
+
   void shutdown();
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 793f4e2..57e59b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Handles Pipeline Reports from datanode.
@@ -52,17 +53,14 @@ public class PipelineReportHandler implements
   private final boolean pipelineAvailabilityCheck;
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
-      PipelineManager pipelineManager,
-      Configuration conf) {
+      PipelineManager pipelineManager, Configuration conf) {
     Preconditions.checkNotNull(pipelineManager);
-    Objects.requireNonNull(scmSafeModeManager);
     this.scmSafeModeManager = scmSafeModeManager;
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.pipelineAvailabilityCheck = conf.getBoolean(
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
-
   }
 
   @Override
@@ -79,38 +77,40 @@ public class PipelineReportHandler implements
     }
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
       try {
-        processPipelineReport(report, dn);
+        processPipelineReport(report, dn, publisher);
       } catch (IOException e) {
         LOGGER.error("Could not process pipeline report={} from dn={} {}",
             report, dn, e);
       }
     }
-    if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
-      publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          pipelineReportFromDatanode);
-    }
-
   }
 
-  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
-      throws IOException {
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+      EventPublisher publisher) throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     Pipeline pipeline;
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
-          pipelineManager.getGrpcTlsConfig());
+      final ClosePipelineCommand closeCommand =
+          new ClosePipelineCommand(pipelineID);
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(dn.getUuid(), closeCommand);
+      publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
       return;
     }
 
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-      LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn);
+      LOGGER.info("Pipeline {} {} reported by {}", pipeline.getFactor(),
+          pipeline.getId(), dn);
       pipeline.reportDatanode(dn);
       if (pipeline.isHealthy()) {
         // if all the dns have reported, pipeline can be moved to OPEN state
         pipelineManager.openPipeline(pipelineID);
       }
+      if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+        publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+      }
     } else {
       // In OPEN state case just report the datanode
       pipeline.reportDatanode(dn);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 7615057..93fbbd1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -131,9 +131,9 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
+      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
       pipeline = pipelineStateMap
           .updatePipelineState(pipelineId, PipelineState.OPEN);
-      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
     }
     return pipeline;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 0324a58..126b0c7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,28 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -69,6 +60,7 @@ public class RatisPipelineProvider implements 
PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final EventPublisher eventPublisher;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -83,15 +75,14 @@ public class RatisPipelineProvider implements 
PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
-      GrpcTlsConfig tlsConfig) {
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    this.tlsConfig = tlsConfig;
+    this.eventPublisher = eventPublisher;
   }
 
 
@@ -155,12 +146,25 @@ public class RatisPipelineProvider implements 
PipelineProvider {
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.OPEN)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
         .build();
-    initializePipeline(pipeline);
+
+    // Send command to datanodes to create pipeline
+    final CreatePipelineCommand createCommand =
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns);
+
+    dns.stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), createCommand);
+      LOG.info("Send pipeline:{} create command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
+
     return pipeline;
   }
 
@@ -188,69 +192,23 @@ public class RatisPipelineProvider implements 
PipelineProvider {
     }
   }
 
-  protected void initializePipeline(Pipeline pipeline) throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    }
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> {
-          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
-          if (reply == null || !reply.isSuccess()) {
-            String msg = "Pipeline initialization failed for pipeline:"
-                + pipeline.getId() + " node:" + peer.getId();
-            LOG.error(msg);
-            throw new IOException(msg);
-          }
-        });
-  }
-
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final String rpcType = conf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
-    final List< IOException > exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(conf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(conf);
-    try {
-      forkJoinPool.submit(() -> {
-        datanodes.parallelStream().forEach(d -> {
-          final RaftPeer p = RatisHelper.toRaftPeer(d);
-          try (RaftClient client = RatisHelper
-              .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-                  retryPolicy, maxOutstandingRequests, tlsConfig,
-                  requestTimeout)) {
-            rpc.accept(client, p);
-          } catch (IOException ioe) {
-            String errMsg =
-                "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
-            LOG.error(errMsg, ioe);
-            exceptions.add(new IOException(errMsg, ioe));
-          }
-        });
-      }).get();
-    } catch (ExecutionException | RejectedExecutionException ex) {
-      LOG.error(ex.getClass().getName() + " exception occurred during " +
-          "createPipeline", ex);
-      throw new IOException(ex.getClass().getName() + " exception occurred " +
-          "during createPipeline", ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupt exception occurred during " +
-          "createPipeline", ex);
-    }
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
+  /**
+   * Removes pipeline from SCM. Sends command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  public void close(Pipeline pipeline) {
+    LOG.debug("Destroy pipeline:{}", pipeline.getId());
+    final ClosePipelineCommand closeCommand =
+        new ClosePipelineCommand(pipeline.getId());
+    pipeline.getNodes().stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), closeCommand);
+      LOG.info("Send pipeline:{} close command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 20fa092..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisPipelineUtils.class);
-
-  private RatisPipelineUtils() {
-  }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    }
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException 
{
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(ozoneConf);
-    try(RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
-            requestTimeout)) {
-      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
-          true, p.getId());
-    }
-  }
-}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..c77b88b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -81,18 +83,21 @@ public class SCMPipelineManager implements PipelineManager {
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
   private final Configuration conf;
+  private boolean pipelineAvailabilityCheck;
+  private boolean createPipelineInSafemode;
+  private Set<PipelineID> oldRatisThreeFactorPipelineIDSet = new HashSet<>();
+  private long pipelineWaitDefaultTimeout;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
-  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      EventPublisher eventPublisher)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf, grpcTlsConfig);
+        conf, eventPublisher);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -113,8 +118,17 @@ public class SCMPipelineManager implements PipelineManager 
{
     this.metrics = SCMPipelineMetrics.create();
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
+    this.pipelineAvailabilityCheck = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
+    this.createPipelineInSafemode = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     initializePipelineState();
-    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -127,9 +141,16 @@ public class SCMPipelineManager implements PipelineManager 
{
     pipelineFactory.setProvider(replicationType, provider);
   }
 
+  public Set<PipelineID> getOldPipelineIdSet() {
+    return oldRatisThreeFactorPipelineIDSet;
+  }
+
   private void initializePipelineState() throws IOException {
     if (pipelineStore.isEmpty()) {
       LOG.info("No pipeline exists in current db");
+      if (pipelineAvailabilityCheck && createPipelineInSafemode) {
+        startPipelineCreator();
+      }
       return;
     }
     List<Map.Entry<byte[], byte[]>> pipelines =
@@ -144,12 +165,16 @@ public class SCMPipelineManager implements 
PipelineManager {
       Preconditions.checkNotNull(pipeline);
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
+      if (pipeline.getType() == ReplicationType.RATIS &&
+          pipeline.getFactor() == ReplicationFactor.THREE) {
+        oldRatisThreeFactorPipelineIDSet.add(pipeline.getId());
+      }
     }
   }
 
   @Override
-  public synchronized Pipeline createPipeline(
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline createPipeline(ReplicationType type,
+      ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +182,11 @@ public class SCMPipelineManager implements PipelineManager 
{
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
+      metrics.incNumPipelineAllocated();
+      if (pipeline.isOpen()) {
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
       return pipeline;
     } catch (InsufficientDatanodesException idEx) {
       throw idEx;
@@ -225,6 +253,16 @@ public class SCMPipelineManager implements PipelineManager 
{
     }
   }
 
+  public List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,6 +331,7 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.openPipeline(pipelineId);
+      metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
     } finally {
       lock.writeLock().unlock();
@@ -380,6 +419,44 @@ public class SCMPipelineManager implements PipelineManager 
{
   }
 
   /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout, millisecond
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  @Override
+  public void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+    Pipeline pipeline;
+    try {
+      pipeline = stateManager.getPipeline(pipelineID);
+    } catch (PipelineNotFoundException e) {
+      throw new IOException(String.format("Pipeline %s cannot be found",
+          pipelineID));
+    }
+
+    boolean ready;
+    long st = Time.monotonicNow();
+    if (timeout == 0) {
+      timeout = pipelineWaitDefaultTimeout;
+    }
+    for(ready = pipeline.isOpen();
+        !ready && Time.monotonicNow() - st < timeout;
+        ready = pipeline.isOpen()) {
+      try {
+        Thread.sleep((long)1000);
+      } catch (InterruptedException e) {
+
+      }
+    }
+    if (!ready) {
+      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+          pipelineID, timeout));
+    }
+  }
+
+  /**
    * Moves the pipeline to CLOSED state and sends close container command for
    * all the containers in the pipeline.
    *
@@ -408,7 +485,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+    pipelineFactory.close(pipeline.getType(), pipeline);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -441,11 +518,6 @@ public class SCMPipelineManager implements PipelineManager 
{
   }
 
   @Override
-  public GrpcTlsConfig getGrpcTlsConfig() {
-    return grpcTlsConfig;
-  }
-
-  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6e..cd80010 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -46,6 +46,7 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
 
   private MetricsRegistry registry;
 
+  private @Metric MutableCounterLong numPipelineAllocated;
   private @Metric MutableCounterLong numPipelineCreated;
   private @Metric MutableCounterLong numPipelineCreationFailed;
   private @Metric MutableCounterLong numPipelineDestroyed;
@@ -83,6 +84,7 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
   @SuppressWarnings("SuspiciousMethodCalls")
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+    numPipelineAllocated.snapshot(recordBuilder, true);
     numPipelineCreated.snapshot(recordBuilder, true);
     numPipelineCreationFailed.snapshot(recordBuilder, true);
     numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -117,6 +119,14 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
   }
 
   /**
+   * Increments number of pipeline allocation count, including succeeded
+   * and failed.
+   */
+  void incNumPipelineAllocated() {
+    numPipelineAllocated.incr();
+  }
+
+  /**
    * Increments number of successful pipeline creation count.
    */
   void incNumPipelineCreated() {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..00cb7ae 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements 
PipelineProvider {
   }
 
   @Override
+  public void close(Pipeline pipeline) throws IOException {
+
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..28f1200 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,17 +20,10 @@ package org.apache.hadoop.hdds.scm.safemode;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -38,9 +31,6 @@ import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * Class defining Safe mode exit criteria for Pipelines.
  *
@@ -49,43 +39,52 @@ import java.util.Set;
  * through in a cluster.
  */
 public class HealthyPipelineSafeModeRule
-    extends SafeModeExitRule<PipelineReportFromDatanode>{
+    extends SafeModeExitRule<Pipeline>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
-  private final PipelineManager pipelineManager;
   private final int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Set<DatanodeDetails> processedDatanodeDetails =
-      new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager manager, Configuration configuration) {
     super(manager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
     double healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    int minHealthyPipelines = 0;
+
+    boolean createPipelineInSafemode = configuration.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    if (createPipelineInSafemode) {
+      minHealthyPipelines =
+          configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+              HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+    }
+
     Preconditions.checkArgument(
         (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
         HddsConfigKeys.
             HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
             + " value should be >= 0.0 and <= 1.0");
 
-    // As we want to wait for 3 node pipelines
-    int pipelineCount =
+    // As we want to wait for RATIS write pipelines, no matter ONE or THREE
+    int pipelineCount = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS, Pipeline.PipelineState.OPEN).size() +
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            Pipeline.PipelineState.ALLOCATED).size();
 
     // This value will be zero when pipeline count is 0.
     // On a fresh installed cluster, there will be zero pipelines in the SCM
     // pipeline DB.
-    healthyPipelineThresholdCount =
-        (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
 
     LOG.info(" Total pipeline count is {}, healthy pipeline " +
         "threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -95,8 +94,8 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -108,54 +107,27 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
+  protected void process(Pipeline pipeline) {
 
     // When SCM is in safe mode for long time, already registered
-    // datanode can send pipeline report again, then pipeline handler fires
-    // processed report event, we should not consider this pipeline report
-    // from datanode again during threshold calculation.
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dnDetails = 
pipelineReportFromDatanode.getDatanodeDetails();
-    if (!processedDatanodeDetails.contains(
-        pipelineReportFromDatanode.getDatanodeDetails())) {
-
-      Pipeline pipeline;
-      PipelineReportsProto pipelineReport =
-          pipelineReportFromDatanode.getReport();
-
-      for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-        PipelineID pipelineID = PipelineID
-            .getFromProtobuf(report.getPipelineID());
-        try {
-          pipeline = pipelineManager.getPipeline(pipelineID);
-        } catch (PipelineNotFoundException e) {
-          continue;
-        }
-
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
-          // If the pipeline is open state mean, all 3 datanodes are reported
-          // for this pipeline.
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-        }
-      }
-      if (scmInSafeMode()) {
-        SCMSafeModeManager.getLogger().info(
-            "SCM in safe mode. Healthy pipelines reported count is {}, " +
-                "required healthy pipeline reported count is {}",
-            currentHealthyPipelineCount, healthyPipelineThresholdCount);
-      }
-
-      processedDatanodeDetails.add(dnDetails);
+    // datanode can send pipeline report again, or SCMPipelineManager will
+    // create new pipelines.
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS) {
+      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+      currentHealthyPipelineCount++;
     }
 
+    if (scmInSafeMode()) {
+      SCMSafeModeManager.getLogger().info(
+          "SCM in safe mode. Healthy pipelines reported count is {}, " +
+              "required healthy pipeline reported count is {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount);
+    }
   }
 
   @Override
   protected void cleanup() {
-    processedDatanodeDetails.clear();
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..abf012d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,11 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
-    PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -47,22 +41,24 @@ import java.util.Set;
  * replica available for read when we exit safe mode.
  */
 public class OneReplicaPipelineSafeModeRule extends
-    SafeModeExitRule<PipelineReportFromDatanode> {
+    SafeModeExitRule<Pipeline> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
 
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
-  private final PipelineManager pipelineManager;
-  private int currentReportedPipelineCount = 0;
+  private Set<PipelineID> oldPipelineIDSet;
+  private int oldPipelineReportedCount = 0;
+  private int oldPipelineThresholdCount = 0;
+  private int newPipelineThresholdCount = 0;
+  private int newPipelineReportedCount = 0;
 
 
   public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager safeModeManager, Configuration configuration) {
     super(safeModeManager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
 
     double percent =
         configuration.getDouble(
@@ -75,69 +71,68 @@ public class OneReplicaPipelineSafeModeRule extends
             HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
+    oldPipelineIDSet =
+        ((SCMPipelineManager)pipelineManager).getOldPipelineIdSet();
     int totalPipelineCount =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+        HddsProtos.ReplicationFactor.THREE).size();
+    Preconditions.checkState(totalPipelineCount >= oldPipelineIDSet.size());
 
-    thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
+    oldPipelineThresholdCount =
+        (int) Math.ceil(percent * oldPipelineIDSet.size());
+    newPipelineThresholdCount = (int) Math.ceil(
+        percent * (totalPipelineCount - oldPipelineIDSet.size()));
 
-    LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+    thresholdCount = oldPipelineThresholdCount + newPipelineThresholdCount;
+
+    LOG.info("Total pipeline count is {}, pipeline's with at least one " +
         "datanode reported threshold count is {}", totalPipelineCount,
         thresholdCount);
 
     getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
         thresholdCount);
-
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
   protected boolean validate() {
-    if (currentReportedPipelineCount >= thresholdCount) {
+    if (newPipelineReportedCount + oldPipelineReportedCount >= thresholdCount) 
{
       return true;
     }
     return false;
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          !reportedPipelineIDSet.contains(pipelineID)) {
-        reportedPipelineIDSet.add(pipelineID);
+  protected void process(Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+        !reportedPipelineIDSet.contains(pipeline.getId())) {
+      if (oldPipelineIDSet.contains(pipeline.getId()) &&
+          oldPipelineReportedCount < oldPipelineThresholdCount) {
         getSafeModeMetrics()
             .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        oldPipelineReportedCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
+      } else if (newPipelineReportedCount < newPipelineThresholdCount) {
+        getSafeModeMetrics()
+            .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        newPipelineReportedCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
       }
     }
 
-    currentReportedPipelineCount = reportedPipelineIDSet.size();
-
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. Pipelines with atleast one datanode reported " +
-              "count is {}, required atleast one datanode reported per " +
+          "SCM in safe mode. Pipelines with at least one datanode reported " +
+              "count is {}, required at least one datanode reported per " +
               "pipeline count is {}",
-          currentReportedPipelineCount, thresholdCount);
+          newPipelineReportedCount + oldPipelineReportedCount, thresholdCount);
     }
-
   }
 
   @Override
@@ -152,7 +147,6 @@ public class OneReplicaPipelineSafeModeRule extends
 
   @VisibleForTesting
   public int getCurrentReportedPipelineCount() {
-    return currentReportedPipelineCount;
+    return newPipelineReportedCount + oldPipelineReportedCount;
   }
-
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..161963e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -59,17 +60,17 @@ import org.slf4j.LoggerFactory;
  * number of datanode registered is met or not.
  *
  * 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and check if pipeline is healthy
  * and increments current healthy pipeline count. Then validate it cutoff
  * threshold for healthy pipeline is met or not.
  *
  * 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and add the reported pipeline to
  * reported pipeline set. Then validate it cutoff threshold for one replica
  * per pipeline is met or not.
@@ -97,6 +98,7 @@ public class SCMSafeModeManager {
   private final PipelineManager pipelineManager;
 
   private final SafeModeMetrics safeModeMetrics;
+  private boolean createPipelineInSafeMode = false;
 
   public SCMSafeModeManager(Configuration conf,
       List<ContainerInfo> allContainers, PipelineManager pipelineManager,
@@ -134,6 +136,9 @@ public class SCMSafeModeManager {
         exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
             oneReplicaPipelineSafeModeRule);
       }
+      createPipelineInSafeMode = conf.getBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
       emitSafeModeStatus();
     } else {
       this.safeModeMetrics = null;
@@ -166,6 +171,7 @@ public class SCMSafeModeManager {
 
     if (exitRules.get(ruleName) != null) {
       validatedRules.add(ruleName);
+      LOG.info("{} rule is successfully validated", ruleName);
     } else {
       // This should never happen
       LOG.error("No Such Exit rule {}", ruleName);
@@ -190,6 +196,18 @@ public class SCMSafeModeManager {
    */
   @VisibleForTesting
   public void exitSafeMode(EventPublisher eventQueue) {
+    // Wait a while for as many as new pipelines to be ready
+    if (createPipelineInSafeMode) {
+      long sleepTime = config.getTimeDuration(
+          HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+          HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+      }
+    }
+
     LOG.info("SCM exiting safe mode.");
     setInSafeMode(false);
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index b9e5333..6128266 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -129,7 +129,8 @@ public class SafeModeHandler implements 
EventHandler<SafeModeStatus> {
     List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
     pipelineList.forEach((pipeline) -> {
       try {
-        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+            pipeline.isAllocationTimeout()) {
           scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
         }
       } catch (IOException ex) {
@@ -142,6 +143,4 @@ public class SafeModeHandler implements 
EventHandler<SafeModeStatus> {
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
-
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       if (heartbeat.getCommandStatusReportsCount() != 0) {
+        LOG.debug("Dispatching Command Status Report.");
         for (CommandStatusReportsProto commandStatusReport : heartbeat
             .getCommandStatusReportsList()) {
           eventPublisher.fireEvent(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 530c0a6..901bc2c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -79,6 +81,12 @@ import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .closePipelineCommand;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -329,6 +337,18 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case createPipelineCommand:
+      return builder
+          .setCommandType(createPipelineCommand)
+          .setCreatePipelineCommandProto(
+              ((CreatePipelineCommand)cmd).getProto())
+          .build();
+    case closePipelineCommand:
+      return builder
+          .setCommandType(closePipelineCommand)
+          .setClosePipelineCommandProto(
+              ((ClosePipelineCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index af65e13..c45c15d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -402,8 +402,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
-              grpcTlsConfig);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     }
 
     if (configurator.getContainerManager() != null) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 37321d7..8d71fe5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -360,6 +362,17 @@ public final class TestUtils {
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
 
+  public static void openAllRatisPipelines(PipelineManager pipelineManager)
+      throws IOException {
+    // Pipeline is created by background thread
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+    // Trigger the processed pipeline report event
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.openPipeline(pipeline.getId());
+    }
+  }
+
   public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
       DatanodeDetails dn, PipelineID... pipelineIDs) {
     PipelineActionsProto.Builder actionsProtoBuilder =
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index a8364a4..b27f8a3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -67,7 +69,7 @@ public class TestCloseContainerEventHandler {
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
+        new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration);
@@ -80,6 +82,8 @@ public class TestCloseContainerEventHandler {
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+    // Move all pipelines created by background from ALLOCATED to OPEN state
+    TestUtils.openAllRatisPipelines(pipelineManager);
   }
 
   @AfterClass
@@ -115,7 +119,9 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
-
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
+    pipelineManager.openPipeline(pipeline.getId());
     ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, "ozone");
@@ -133,7 +139,9 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithRatis() throws IOException {
-
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    pipelineManager.openPipeline(pipeline.getId());
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerInfo container = containerManager
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 75a1ad3..c203173 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -94,7 +94,7 @@ public class TestSCMContainerManager {
     }
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
@@ -147,7 +147,7 @@ public class TestSCMContainerManager {
           containerInfo.getPipelineID()).getFirstNode()
           .getUuid());
     }
-    Assert.assertTrue(pipelineList.size() > 5);
+    Assert.assertTrue(pipelineList.size() >= 1);
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 26ffd8d..43c174e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -107,7 +107,7 @@ public class TestContainerPlacement {
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
+        new SCMPipelineManager(config, scmNodeManager, eventQueue);
     return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..14c24e0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -73,6 +73,7 @@ public class TestDeadNodeHandler {
   private SCMNodeManager nodeManager;
   private ContainerManager containerManager;
   private NodeReportHandler nodeReportHandler;
+  private SCMPipelineManager pipelineManager;
   private DeadNodeHandler deadNodeHandler;
   private EventPublisher publisher;
   private EventQueue eventQueue;
@@ -87,12 +88,12 @@ public class TestDeadNodeHandler {
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
-    SCMPipelineManager manager =
+    pipelineManager =
         (SCMPipelineManager)scm.getPipelineManager();
     PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
-            conf);
-    manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
     deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -147,6 +148,8 @@ public class TestDeadNodeHandler {
     nodeManager.register(TestUtils.randomDatanodeDetails(),
         TestUtils.createNodeReport(storageOne), null);
 
+    TestUtils.openAllRatisPipelines(pipelineManager);
+
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerManager);
     ContainerInfo container2 =
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index db76d66..f4eb797 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
         testDir.getAbsolutePath());
     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
       eq.processAll(1000L);
       List<SCMCommand> command =
           nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertEquals(1, command.size());
-      Assert
-          .assertEquals(command.get(0).getClass(), 
CloseContainerCommand.class);
+      // With dh registered, SCM will send create pipeline command to dn
+      Assert.assertTrue(command.size() >= 1);
+      Assert.assertTrue(command.get(0).getClass().equals(
+          CloseContainerCommand.class) ||
+          command.get(1).getClass().equals(CloseContainerCommand.class));
     } catch (IOException e) {
       e.printStackTrace();
       throw  e;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 01c53ba..28a3484 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 
 import java.io.IOException;
 
@@ -31,7 +32,7 @@ public class MockRatisPipelineProvider extends 
RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf, null);
+    super(nodeManager, stateManager, conf, new EventQueue());
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 94c3039..3d9f05a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -68,10 +63,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
-
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
 
@@ -113,10 +107,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
 
@@ -188,10 +182,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
 
 
-      // No datanodes have sent pipelinereport from datanode
+      // No pipeline event have sent to SCMSafemodeManager
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
 
@@ -225,14 +220,14 @@ public class TestHealthyPipelineSafeModeRule {
           GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
               SCMSafeModeManager.class));
 
-      // fire event with pipeline report with ratis type and factor 1
+      // fire event with pipeline create status with ratis type and factor 1
       // pipeline, validate() should return false
       firePipelineEvent(pipeline1, eventQueue);
 
       GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-          "reported count is 0"),
+          "reported count is 1"),
           1000, 5000);
-      Assert.assertFalse(healthyPipelineSafeModeRule.validate());
+      Assert.assertTrue(healthyPipelineSafeModeRule.validate());
 
       firePipelineEvent(pipeline2, eventQueue);
       firePipelineEvent(pipeline3, eventQueue);
@@ -246,19 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
 
   }
 
-
   private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
-    PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-        .newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    // Here no need to fire event from 3 nodes, as already pipeline is in
-    // open state, but doing it.
-    eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-            pipeline.getNodes().get(0), reportBuilder.build()));
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index ca54d05..0fa5eae 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
     ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().toString());
+    ozoneConfiguration.setBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     List<ContainerInfo> containers = new ArrayList<>();
     containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -71,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule {
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
-            eventQueue, null);
+            eventQueue);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
 
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineCountThree - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
-
-
   private void createPipelines(int count,
       HddsProtos.ReplicationFactor factor) throws Exception {
     for (int i = 0; i < count; i++) {
@@ -184,26 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   }
 
   private void firePipelineEvent(Pipeline pipeline) {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(1), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(2), reportBuilder.build()));
-    } else {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-    }
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 247b38a..b5839bc 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -73,6 +70,8 @@ public class TestSCMSafeModeManager {
   public static void setUp() {
     queue = new EventQueue();
     config = new OzoneConfiguration();
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
   }
 
   @Test
@@ -177,7 +176,7 @@ public class TestSCMSafeModeManager {
         HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
     conf.setDouble(HddsConfigKeys.
         HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -199,7 +198,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -217,7 +216,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -234,7 +233,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -258,7 +257,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue, null);
+        mockNodeManager, queue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
@@ -302,12 +301,12 @@ public class TestSCMSafeModeManager {
     // we shall a get an event when datanode is registered. In that case,
     // validate will return true, and add this to validatedRules.
     if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
-      firePipelineEvent(pipelines.get(0));
+      firePipelineEvent(pipelineManager, pipelines.get(0));
     }
 
     for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
-        oneReplicaThresholdCount); i++) {
-      firePipelineEvent(pipelines.get(i));
+        Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+      firePipelineEvent(pipelineManager, pipelines.get(i));
 
       if (i < healthyPipelineThresholdCount) {
         checkHealthy(i + 1);
@@ -352,15 +351,11 @@ public class TestSCMSafeModeManager {
         1000,  5000);
   }
 
-  private void firePipelineEvent(Pipeline pipeline) throws Exception {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-            reportBuilder.build()));
+  private void firePipelineEvent(SCMPipelineManager pipelineManager,
+      Pipeline pipeline) throws Exception {
+    pipelineManager.openPipeline(pipeline.getId());
+    queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+        pipelineManager.getPipeline(pipeline.getId()));
   }
 
 
@@ -479,7 +474,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue, null);
+          nodeManager, queue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -490,10 +485,6 @@ public class TestSCMSafeModeManager {
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
-      PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-          .newBuilder();
-      reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-          .setPipelineID(pipeline.getId().getProtobuf()));
 
       scmSafeModeManager = new SCMSafeModeManager(
           config, containers, pipelineManager, queue);
@@ -502,10 +493,9 @@ public class TestSCMSafeModeManager {
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
       assertTrue(scmSafeModeManager.getInSafeMode());
 
-      // Trigger the processed pipeline report event
-      queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-              reportBuilder.build()));
+
+
+      firePipelineEvent(pipelineManager, pipeline);
 
       GenericTestUtils.waitFor(() -> {
         return !scmSafeModeManager.getInSafeMode();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index e4f1a37..378a1a6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -164,7 +164,9 @@ public class TestContainerStateManagerIntegration {
       }
     }
 
-    cluster.restartStorageContainerManager(true);
+    // Restart SCM will not trigger container report to satisfy the safe mode
+    // exit rule.
+    cluster.restartStorageContainerManager(false);
 
     List<ContainerInfo> result = cluster.getStorageContainerManager()
         .getContainerManager().listContainer(null, 100);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..21fa7bd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
-    pipelineDestroyTimeoutInMillis = 5000;
+    pipelineDestroyTimeoutInMillis = 1000;
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
         pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep(5000);
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 4b3d5d6..924dce1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -45,7 +46,9 @@ public class TestRatisPipelineProvider {
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
-    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    stateManager = new PipelineStateManager(conf);
     provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, new OzoneConfiguration());
   }
@@ -57,7 +60,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
@@ -68,7 +71,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -80,7 +83,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -94,7 +97,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -189,7 +192,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..5d934eb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -61,6 +63,8 @@ public class TestSCMPipelineManager {
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        "false");
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
       throw new IOException("Unable to create test directory path");
@@ -76,7 +80,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -93,7 +97,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -116,7 +120,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -134,7 +138,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     try {
       pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
@@ -150,7 +154,7 @@ public class TestSCMPipelineManager {
   public void testPipelineReport() throws IOException {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -165,10 +169,11 @@ public class TestSCMPipelineManager {
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
+
     Assert
         
.assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
     Assert
-        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // get pipeline report from each dn in the pipeline
     PipelineReportHandler pipelineReportHandler =
@@ -217,7 +222,7 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -226,9 +231,9 @@ public class TestSCMPipelineManager {
 
     MetricsRecordBuilder metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    long numPipelineCreated = getLongCounter("NumPipelineCreated",
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
         metrics);
-    Assert.assertTrue(numPipelineCreated == 0);
+    Assert.assertTrue(numPipelineAllocated == 0);
 
     // 3 DNs are unhealthy.
     // Create 5 pipelines (Use up 15 Datanodes)
@@ -241,8 +246,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     long numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -261,8 +266,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -272,7 +277,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..b06a2fb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -143,11 +143,16 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      final boolean isReady = healthy == hddsDatanodes.size();
+      final boolean isNodeReady = healthy == hddsDatanodes.size();
+      final boolean exitSafeMode = !scm.isInSafeMode();
+
       LOG.info("{}. Got {} of {} DN Heartbeats.",
-          isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+          isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+          healthy, hddsDatanodes.size());
+      LOG.info(exitSafeMode? "Cluster exits safe mode" :
+              "Waiting for cluster to exit safe mode",
           healthy, hddsDatanodes.size());
-      return isReady;
+      return isNodeReady && exitSafeMode;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
@@ -615,7 +620,6 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
       if (hbInterval.isPresent()) {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             hbInterval.get(), TimeUnit.MILLISECONDS);
-
       } else {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             DEFAULT_HB_INTERVAL_MS,


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org

Reply via email to