This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 05c7a98  HDDS-5253. Support container move HA (#2488)
05c7a98 is described below

commit 05c7a98ede0d9f1d5995ad5827055db9a9866bc7
Author: Jackson Yao <[email protected]>
AuthorDate: Mon Aug 16 19:17:13 2021 +0800

    HDDS-5253. Support container move HA (#2488)
---
 .../container/common/helpers/MoveDataNodePair.java |  71 ++++
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |   6 +
 .../interface-client/src/main/proto/hdds.proto     |   5 +
 .../src/main/proto/SCMRatisProtocol.proto          |   1 +
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   8 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |   5 +-
 .../hdds/scm/container/ContainerReplicaCount.java  |   1 +
 .../hdds/scm/container/ReplicationManager.java     | 464 +++++++++++++++++----
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   2 +
 .../hdds/scm/metadata/MoveDataNodePairCodec.java   |  53 +++
 .../hadoop/hdds/scm/metadata/SCMDBDefinition.java  |  13 +-
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |  13 +
 .../hdds/scm/server/StorageContainerManager.java   |   4 +-
 .../hdds/scm/container/TestReplicationManager.java | 229 +++++++---
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   1 +
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java |   6 +
 .../hadoop/ozone/TestStorageContainerManager.java  |   7 +-
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |  91 ++++
 .../scm/node/TestDecommissionAndMaintenance.java   |   6 +-
 19 files changed, 826 insertions(+), 160 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
new file mode 100644
index 0000000..578134e
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * MoveDataNodePair encapsulates the source and target
+ * datanodes of a move option.
+ */
+public class MoveDataNodePair {
+  /**
+   * source datanode of current move option.
+   */
+  private final DatanodeDetails src;
+
+  /**
+   * target datanode of current move option.
+   */
+  private final DatanodeDetails tgt;
+
+  public MoveDataNodePair(DatanodeDetails src, DatanodeDetails tgt) {
+    this.src = src;
+    this.tgt = tgt;
+  }
+
+  public DatanodeDetails getTgt() {
+    return tgt;
+  }
+
+  public DatanodeDetails getSrc() {
+    return src;
+  }
+
+  public HddsProtos.MoveDataNodePairProto getProtobufMessage(int clientVersion)
+      throws IOException {
+    HddsProtos.MoveDataNodePairProto.Builder builder =
+        HddsProtos.MoveDataNodePairProto.newBuilder()
+            .setSrc(src.toProto(clientVersion))
+            .setTgt(tgt.toProto(clientVersion));
+    return builder.build();
+  }
+
+  public static MoveDataNodePair getFromProtobuf(
+      HddsProtos.MoveDataNodePairProto mdnpp) {
+    Preconditions.assertNotNull(mdnpp, "MoveDataNodePair is null");
+    DatanodeDetails src = DatanodeDetails.getFromProtoBuf(mdnpp.getSrc());
+    DatanodeDetails tgt = DatanodeDetails.getFromProtoBuf(mdnpp.getTgt());
+    return new MoveDataNodePair(src, tgt);
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 95dc477..a63d90d0 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
 import org.apache.hadoop.hdds.utils.DBStoreHAManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -147,4 +148,9 @@ public interface SCMMetadataStore extends DBStoreHAManager {
    * Table that maintains sequence id information.
    */
   Table<String, Long> getSequenceIdTable();
+
+  /**
+   * Table that maintains move information.
+   */
+  Table<ContainerID, MoveDataNodePair> getMoveTable();
 }
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index f115d0c..ee50354 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -62,6 +62,11 @@ message ExtendedDatanodeDetailsProto {
     optional string buildDate = 5;
 }
 
+message MoveDataNodePairProto {
+    required DatanodeDetailsProto src = 1;
+    required DatanodeDetailsProto tgt = 2;
+}
+
 /**
  Proto message encapsulating information required to uniquely identify a
  OzoneManager.
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8066c8b..8e4237e 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -26,6 +26,7 @@ enum RequestType {
     BLOCK = 3;
     SEQUENCE_ID = 4;
     CERT_STORE = 5;
+    MOVE = 6;
 }
 
 message Method {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 6a009f3..1673f30 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -32,10 +32,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -69,7 +67,6 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
 
   private final StorageContainerManager scm;
   private final PipelineManager pipelineManager;
-  private final ContainerManagerV2 containerManager;
   private final WritableContainerFactory writableContainerFactory;
 
   private final long containerSize;
@@ -78,7 +75,6 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
   private final SCMBlockDeletingService blockDeletingService;
 
   private ObjectName mxBean;
-  private final PipelineChoosePolicy pipelineChoosePolicy;
   private final SequenceIdGenerator sequenceIdGen;
   private ScmBlockDeletingServiceMetrics metrics;
   /**
@@ -94,8 +90,6 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
     Objects.requireNonNull(scm, "SCM cannot be null");
     this.scm = scm;
     this.pipelineManager = scm.getPipelineManager();
-    this.containerManager = scm.getContainerManager();
-    this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
     this.sequenceIdGen = scm.getSequenceIdGen();
     this.containerSize = (long)conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -123,7 +117,7 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
             OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
             TimeUnit.MILLISECONDS);
     blockDeletingService =
-        new SCMBlockDeletingService(deletedBlockLog, containerManager,
+        new SCMBlockDeletingService(deletedBlockLog,
             scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
             scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf,
             metrics);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 7776c56..cd50e7d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.ScmConfig;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -67,7 +66,6 @@ public class SCMBlockDeletingService extends BackgroundService
 
   private static final int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 1;
   private final DeletedBlockLog deletedBlockLog;
-  private final ContainerManagerV2 containerManager;
   private final NodeManager nodeManager;
   private final EventPublisher eventPublisher;
   private final SCMContext scmContext;
@@ -83,14 +81,13 @@ public class SCMBlockDeletingService extends 
BackgroundService
 
   @SuppressWarnings("parameternumber")
   public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
-      ContainerManagerV2 containerManager, NodeManager nodeManager,
+              NodeManager nodeManager,
       EventPublisher eventPublisher, SCMContext scmContext,
       SCMServiceManager serviceManager, Duration interval, long serviceTimeout,
       ConfigurationSource conf, ScmBlockDeletingServiceMetrics metrics) {
     super("SCMBlockDeletingService", interval.toMillis(), 
TimeUnit.MILLISECONDS,
         BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
     this.deletedBlockLog = deletedBlockLog;
-    this.containerManager = containerManager;
     this.nodeManager = nodeManager;
     this.eventPublisher = eventPublisher;
     this.scmContext = scmContext;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
index bf8c3b9..26368b4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
 import java.util.Set;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 97fda61..009c850 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -27,6 +28,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,8 +43,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -55,18 +55,27 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import static org.apache.hadoop.hdds.protocol.proto.
+    SCMRatisProtocol.RequestType.MOVE;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -74,6 +83,8 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.hdds.utils.db.Table;
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 
 import com.google.protobuf.GeneratedMessage;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
@@ -134,12 +145,6 @@ public class ReplicationManager implements SCMService {
    */
   private final Map<ContainerID, List<InflightAction>> inflightDeletion;
 
-  /**
-   * This is used for tracking container move commands
-   * which are not yet complete.
-   */
-  private final Map<ContainerID,
-      Pair<DatanodeDetails, DatanodeDetails>> inflightMove;
 
   /**
    * This is used for indicating the result of move option and
@@ -150,7 +155,9 @@ public class ReplicationManager implements SCMService {
     // both replication and deletion are completed
     COMPLETED,
     // RM is not running
-    RM_NOT_RUNNING,
+    FAIL_NOT_RUNNING,
+    // RM is not ratis leader
+    FAIL_NOT_LEADER,
     // replication fail because the container does not exist in src
     REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
     // replication fail because the container exists in target
@@ -184,7 +191,9 @@ public class ReplicationManager implements SCMService {
     //unexpected action, remove src at inflightReplication
     UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION,
     //unexpected action, remove target at inflightDeletion
-    UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION
+    UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION,
+    //write DB error
+    FAIL_CAN_NOT_RECORD_TO_DB
   }
 
   /**
@@ -233,6 +242,11 @@ public class ReplicationManager implements SCMService {
   private ReplicationManagerMetrics metrics;
 
   /**
+   * scheduler move option.
+   */
+  private final MoveScheduler moveScheduler;
+
+  /**
    * Constructs ReplicationManager instance with the given configuration.
    *
    * @param conf OzoneConfiguration
@@ -242,13 +256,16 @@ public class ReplicationManager implements SCMService {
    */
   @SuppressWarnings("parameternumber")
   public ReplicationManager(final ConfigurationSource conf,
-                            final ContainerManagerV2 containerManager,
-                            final PlacementPolicy containerPlacement,
-                            final EventPublisher eventPublisher,
-                            final SCMContext scmContext,
-                            final SCMServiceManager serviceManager,
-                            final NodeManager nodeManager,
-                            final java.time.Clock clock) {
+             final ContainerManagerV2 containerManager,
+             final PlacementPolicy containerPlacement,
+             final EventPublisher eventPublisher,
+             final SCMContext scmContext,
+             final SCMServiceManager serviceManager,
+             final NodeManager nodeManager,
+             final java.time.Clock clock,
+             final SCMHAManager scmhaManager,
+             final Table<ContainerID, MoveDataNodePair> moveTable)
+             throws IOException {
     this.containerManager = containerManager;
     this.containerPlacement = containerPlacement;
     this.eventPublisher = eventPublisher;
@@ -258,7 +275,6 @@ public class ReplicationManager implements SCMService {
     this.running = false;
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
-    this.inflightMove = new ConcurrentHashMap<>();
     this.inflightMoveFuture = new ConcurrentHashMap<>();
     this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
     this.clock = clock;
@@ -269,6 +285,11 @@ public class ReplicationManager implements SCMService {
         TimeUnit.MILLISECONDS);
     this.metrics = null;
 
+    moveScheduler = new MoveSchedulerImpl.Builder()
+        .setDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setMoveTable(moveTable).build();
+
     // register ReplicationManager to SCMServiceManager.
     serviceManager.register(this);
 
@@ -317,9 +338,6 @@ public class ReplicationManager implements SCMService {
       LOG.info("Stopping Replication Monitor Thread.");
       inflightReplication.clear();
       inflightDeletion.clear();
-      //TODO: replicate inflight move through ratis
-      inflightMove.clear();
-      inflightMoveFuture.clear();
       running = false;
       metrics.unRegister();
       notifyAll();
@@ -581,15 +599,15 @@ public class ReplicationManager implements SCMService {
       throws ContainerNotFoundException {
     // make sure inflightMove contains the container
     ContainerID id = container.containerID();
-    if (!inflightMove.containsKey(id)) {
-      return;
-    }
 
     // make sure the datanode , which is removed from inflightActions,
     // is source or target datanode.
-    Pair<DatanodeDetails, DatanodeDetails> kv = inflightMove.get(id);
-    final boolean isSource = kv.getKey().equals(dn);
-    final boolean isTarget = kv.getValue().equals(dn);
+    MoveDataNodePair kv = moveScheduler.getMoveDataNodePair(id);
+    if (kv == null) {
+      return;
+    }
+    final boolean isSource = kv.getSrc().equals(dn);
+    final boolean isTarget = kv.getTgt().equals(dn);
     if (!isSource && !isTarget) {
       return;
     }
@@ -610,50 +628,50 @@ public class ReplicationManager implements SCMService {
      */
 
     if (isSource && isInflightReplication) {
-      inflightMoveFuture.get(id).complete(
+      //if RM is reinitialize, inflightMove will be restored,
+      //but inflightMoveFuture not. so there will be a case that
+      //container is in inflightMove, but not in inflightMoveFuture.
+      compleleteMoveFutureWithResult(id,
           MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION);
-      inflightMove.remove(id);
-      inflightMoveFuture.remove(id);
+      moveScheduler.completeMove(id.getProtobuf());
       return;
     }
 
     if (isTarget && !isInflightReplication) {
-      inflightMoveFuture.get(id).complete(
+      compleleteMoveFutureWithResult(id,
           MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION);
-      inflightMove.remove(id);
-      inflightMoveFuture.remove(id);
+      moveScheduler.completeMove(id.getProtobuf());
       return;
     }
 
     if (!(isInflightReplication && isCompleted)) {
       if (isInflightReplication) {
         if (isUnhealthy) {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
         } else if (isNotInService) {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
         } else {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.REPLICATION_FAIL_TIME_OUT);
         }
       } else {
         if (isUnhealthy) {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
         } else if (isTimeout) {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.DELETION_FAIL_TIME_OUT);
         } else if (isNotInService) {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
         } else {
-          inflightMoveFuture.get(id).complete(
+          compleleteMoveFutureWithResult(id,
               MoveResult.COMPLETED);
         }
       }
-      inflightMove.remove(id);
-      inflightMoveFuture.remove(id);
+      moveScheduler.completeMove(id.getProtobuf());
     } else {
       deleteSrcDnForMove(container,
           containerManager.getContainerReplicas(id));
@@ -664,15 +682,32 @@ public class ReplicationManager implements SCMService {
    * add a move action for a given container.
    *
    * @param cid Container to move
-   * @param srcDn datanode to move from
-   * @param targetDn datanode to move to
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  public CompletableFuture<MoveResult> move(ContainerID cid,
+             DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    return move(cid, new MoveDataNodePair(src, tgt));
+  }
+
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param mp MoveDataNodePair which contains source and target datanodes
    */
   public CompletableFuture<MoveResult> move(ContainerID cid,
-      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      MoveDataNodePair mp)
       throws ContainerNotFoundException, NodeNotFoundException {
     CompletableFuture<MoveResult> ret = new CompletableFuture<>();
     if (!isRunning()) {
-      ret.complete(MoveResult.RM_NOT_RUNNING);
+      ret.complete(MoveResult.FAIL_NOT_RUNNING);
+      return ret;
+    }
+
+    if (!scmContext.isLeader()) {
+      ret.complete(MoveResult.FAIL_NOT_LEADER);
       return ret;
     }
 
@@ -693,6 +728,8 @@ public class ReplicationManager implements SCMService {
      * of deletion always depends on placement policy
      */
 
+    DatanodeDetails srcDn = mp.getSrc();
+    DatanodeDetails targetDn = mp.getTgt();
     NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
     NodeState healthStat = currentNodeStat.getHealth();
     NodeOperationalState operationalState =
@@ -777,7 +814,15 @@ public class ReplicationManager implements SCMService {
         return ret;
       }
 
-      inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn));
+      try {
+        moveScheduler.startMove(cid.getProtobuf(),
+            mp.getProtobufMessage(CURRENT_VERSION));
+      } catch (IOException e) {
+        LOG.warn("Exception while starting move {}", cid);
+        ret.complete(MoveResult.FAIL_CAN_NOT_RECORD_TO_DB);
+        return ret;
+      }
+
       inflightMoveFuture.putIfAbsent(cid, ret);
       sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn));
     }
@@ -1213,47 +1258,45 @@ public class ReplicationManager implements SCMService {
   private void deleteSrcDnForMove(final ContainerInfo cif,
                    final Set<ContainerReplica> replicaSet) {
     final ContainerID cid = cif.containerID();
-    if (inflightMove.containsKey(cid)) {
-      Pair<DatanodeDetails, DatanodeDetails> movePair =
-          inflightMove.get(cid);
-      final DatanodeDetails srcDn = movePair.getKey();
-      ContainerReplicaCount replicaCount =
-          getContainerReplicaCount(cif, replicaSet);
-
-      if(!replicaSet.stream()
-          .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
-        // if the target is present but source disappears somehow,
-        // we can consider move is successful.
-        inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED);
-        inflightMove.remove(cid);
-        inflightMoveFuture.remove(cid);
-        return;
-      }
+    MoveDataNodePair movePair = moveScheduler.getMoveDataNodePair(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails srcDn = movePair.getSrc();
+    ContainerReplicaCount replicaCount =
+        getContainerReplicaCount(cif, replicaSet);
+
+    if(!replicaSet.stream()
+        .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      compleleteMoveFutureWithResult(cid, MoveResult.COMPLETED);
+      moveScheduler.completeMove(cid.getProtobuf());
+      return;
+    }
 
-      int replicationFactor =
-          cif.getReplicationConfig().getRequiredNodes();
-      ContainerPlacementStatus currentCPS =
-          getPlacementStatus(replicaSet, replicationFactor);
-      Set<ContainerReplica> newReplicaSet = replicaSet.
-          stream().collect(Collectors.toSet());
-      newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
-      ContainerPlacementStatus newCPS =
-          getPlacementStatus(newReplicaSet, replicationFactor);
-
-      if (replicaCount.isOverReplicated() &&
-          isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
-        sendDeleteCommand(cif, srcDn, true);
-      } else {
-        // if source and target datanode are both in the replicaset,
-        // but we can not delete source datanode for now (e.g.,
-        // there is only 3 replicas or not policy-statisfied , etc.),
-        // we just complete the future without sending a delete command.
-        LOG.info("can not remove source replica after successfully " +
-            "replicated to target datanode");
-        inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY);
-        inflightMove.remove(cid);
-        inflightMoveFuture.remove(cid);
-      }
+    int replicationFactor =
+        cif.getReplicationConfig().getRequiredNodes();
+    ContainerPlacementStatus currentCPS =
+        getPlacementStatus(replicaSet, replicationFactor);
+    Set<ContainerReplica> newReplicaSet = replicaSet.
+        stream().collect(Collectors.toSet());
+    newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+    ContainerPlacementStatus newCPS =
+        getPlacementStatus(newReplicaSet, replicationFactor);
+
+    if (replicaCount.isOverReplicated() &&
+        isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
+      sendDeleteCommand(cif, srcDn, true);
+    } else {
+      // if source and target datanode are both in the replicaset,
+      // but we can not delete source datanode for now (e.g.,
+      // there is only 3 replicas or not policy-statisfied , etc.),
+      // we just complete the future without sending a delete command.
+      LOG.info("can not remove source replica after successfully " +
+          "replicated to target datanode");
+      compleleteMoveFutureWithResult(cid, MoveResult.DELETE_FAIL_POLICY);
+      moveScheduler.completeMove(cid.getProtobuf());
     }
   }
 
@@ -1680,6 +1723,9 @@ public class ReplicationManager implements SCMService {
           lastTimeToBeReadyInMillis = clock.millis();
           serviceStatus = ServiceStatus.RUNNING;
         }
+        //now, as the current scm is leader and it`s state is up-to-date,
+        //we need to take some action about replicated inflight move options.
+        onLeaderReadyAndOutOfSafeMode();
       } else {
         serviceStatus = ServiceStatus.PAUSING;
       }
@@ -1717,8 +1763,244 @@ public class ReplicationManager implements SCMService {
     return inflightDeletion;
   }
 
-  public Map<ContainerID,
-      Pair<DatanodeDetails, DatanodeDetails>> getInflightMove() {
-    return inflightMove;
+  public Map<ContainerID, CompletableFuture<MoveResult>> getInflightMove() {
+    return inflightMoveFuture;
   }
-}
\ No newline at end of file
+
+  /**
+  * make move option HA aware.
+  */
+  public interface MoveScheduler {
+    /**
+     * completeMove a move action for a given container.
+     *
+     * @param contianerIDProto Container to which the move option is finished
+     */
+    @Replicate
+    void completeMove(HddsProtos.ContainerID contianerIDProto);
+
+    /**
+     * start a move action for a given container.
+     *
+     * @param contianerIDProto Container to move
+     * @param mp encapsulates the source and target datanode infos
+     */
+    @Replicate
+    void startMove(HddsProtos.ContainerID contianerIDProto,
+              HddsProtos.MoveDataNodePairProto mp) throws IOException;
+
+    /**
+     * get the MoveDataNodePair of the giver container.
+     *
+     * @param cid Container to move
+     * @return null if cid is not found in MoveScheduler,
+     *          or the corresponding MoveDataNodePair
+     */
+    MoveDataNodePair getMoveDataNodePair(ContainerID cid);
+
+    /**
+     * Reinitialize the MoveScheduler with DB if become leader.
+     */
+    void reinitialize(Table<ContainerID,
+        MoveDataNodePair> moveTable) throws IOException;
+
+    /**
+     * get all the inflight move info.
+     */
+    Map<ContainerID, MoveDataNodePair> getInflightMove();
+  }
+
+  /**
+   * @return the moveScheduler of RM
+   */
+  public MoveScheduler getMoveScheduler() {
+    return moveScheduler;
+  }
+
+  /**
+   * Ratis based MoveScheduler, db operations are stored in
+   * DBTransactionBuffer until a snapshot is taken.
+   */
+  public static final class MoveSchedulerImpl implements MoveScheduler {
+    private Table<ContainerID, MoveDataNodePair> moveTable;
+    private final DBTransactionBuffer transactionBuffer;
+    /**
+     * This is used for tracking container move commands
+     * which are not yet complete.
+     */
+    private final Map<ContainerID, MoveDataNodePair> inflightMove;
+
+    private MoveSchedulerImpl(Table<ContainerID, MoveDataNodePair> moveTable,
+                DBTransactionBuffer transactionBuffer) throws IOException {
+      this.moveTable = moveTable;
+      this.transactionBuffer = transactionBuffer;
+      this.inflightMove = new ConcurrentHashMap<>();
+      initialize();
+    }
+
+    @Override
+    public void completeMove(HddsProtos.ContainerID contianerIDProto) {
+      ContainerID cid = null;
+      try {
+        cid = ContainerID.getFromProtobuf(contianerIDProto);
+        transactionBuffer.removeFromBuffer(moveTable, cid);
+      } catch (IOException e) {
+        LOG.warn("Exception while completing move {}", cid);
+      }
+      inflightMove.remove(cid);
+    }
+
+    @Override
+    public void startMove(HddsProtos.ContainerID contianerIDProto,
+                          HddsProtos.MoveDataNodePairProto mdnpp)
+        throws IOException {
+      ContainerID cid = null;
+      MoveDataNodePair mp = null;
+      try {
+        cid = ContainerID.getFromProtobuf(contianerIDProto);
+        mp = MoveDataNodePair.getFromProtobuf(mdnpp);
+        if(!inflightMove.containsKey(cid)) {
+          transactionBuffer.addToBuffer(moveTable, cid, mp);
+          inflightMove.putIfAbsent(cid, mp);
+        }
+      } catch (IOException e) {
+        LOG.warn("Exception while completing move {}", cid);
+      }
+    }
+
+    @Override
+    public MoveDataNodePair getMoveDataNodePair(ContainerID cid) {
+      return inflightMove.get(cid);
+    }
+
+    @Override
+    public void reinitialize(Table<ContainerID,
+        MoveDataNodePair> mt) throws IOException {
+      moveTable = mt;
+      inflightMove.clear();
+      initialize();
+    }
+
+    private void initialize() throws IOException {
+      TableIterator<ContainerID,
+          ? extends Table.KeyValue<ContainerID, MoveDataNodePair>>
+          iterator = moveTable.iterator();
+
+      while (iterator.hasNext()) {
+        Table.KeyValue<ContainerID, MoveDataNodePair> kv = iterator.next();
+        final ContainerID cid = kv.getKey();
+        final MoveDataNodePair mp = kv.getValue();
+        Preconditions.assertNotNull(cid,
+            "moved container id should not be null");
+        Preconditions.assertNotNull(mp,
+            "MoveDataNodePair container id should not be null");
+        inflightMove.put(cid, mp);
+      }
+    }
+
+    @Override
+    public Map<ContainerID, MoveDataNodePair> getInflightMove() {
+      return inflightMove;
+    }
+
+    /**
+     * Builder for Ratis based MoveSchedule.
+     */
+    public static class Builder {
+      private Table<ContainerID, MoveDataNodePair> moveTable;
+      private DBTransactionBuffer transactionBuffer;
+      private SCMRatisServer ratisServer;
+
+      public Builder setRatisServer(final SCMRatisServer scmRatisServer) {
+        ratisServer = scmRatisServer;
+        return this;
+      }
+
+      public Builder setMoveTable(
+          final Table<ContainerID, MoveDataNodePair> mt) {
+        moveTable = mt;
+        return this;
+      }
+
+      public Builder setDBTransactionBuffer(DBTransactionBuffer trxBuffer) {
+        transactionBuffer = trxBuffer;
+        return this;
+      }
+
+      public MoveScheduler build() throws IOException {
+        Preconditions.assertNotNull(moveTable, "moveTable is null");
+        Preconditions.assertNotNull(transactionBuffer,
+            "transactionBuffer is null");
+
+        final MoveScheduler impl =
+            new MoveSchedulerImpl(moveTable, transactionBuffer);
+
+        final SCMHAInvocationHandler invocationHandler
+            = new SCMHAInvocationHandler(MOVE, impl, ratisServer);
+
+        return (MoveScheduler) Proxy.newProxyInstance(
+            SCMHAInvocationHandler.class.getClassLoader(),
+            new Class<?>[]{MoveScheduler.class},
+            invocationHandler);
+      }
+    }
+  }
+
+  /**
+  * when scm become LeaderReady and out of safe mode, some actions
+  * should be taken. for now , it is only used for handle replicated
+  * infligtht move.
+  */
+  private void onLeaderReadyAndOutOfSafeMode() {
+    List<HddsProtos.ContainerID> needToRemove = new LinkedList<>();
+    moveScheduler.getInflightMove().forEach((k, v) -> {
+      Set<ContainerReplica> replicas;
+      ContainerInfo cif;
+      try {
+        replicas = containerManager.getContainerReplicas(k);
+        cif = containerManager.getContainer(k);
+      } catch (ContainerNotFoundException e) {
+        needToRemove.add(k.getProtobuf());
+        LOG.error("can not find container {} " +
+            "while processing replicated move", k);
+        return;
+      }
+      boolean isSrcExist = replicas.stream()
+          .anyMatch(r -> r.getDatanodeDetails().equals(v.getSrc()));
+      boolean isTgtExist = replicas.stream()
+          .anyMatch(r -> r.getDatanodeDetails().equals(v.getTgt()));
+
+      if(isSrcExist) {
+        if(isTgtExist) {
+          //the former scm leader may or may not send the deletion command
+          //before reelection.here, we just try to send the command again.
+          deleteSrcDnForMove(cif, replicas);
+        } else {
+          // resenting replication command is ok , no matter whether there is 
an
+          // on-going replication
+          sendReplicateCommand(cif, v.getTgt(),
+              Collections.singletonList(v.getSrc()));
+        }
+      } else {
+        // if container does not exist in src datanode, no matter it exists
+        // in target datanode, we can not take more actions to this option,
+        // so just remove it through ratis
+        needToRemove.add(k.getProtobuf());
+      }
+    });
+
+    needToRemove.forEach(moveScheduler::completeMove);
+  }
+
+  /**
+   * complete the CompletableFuture of the container in the given Map with
+   * a given MoveResult.
+   */
+  private void compleleteMoveFutureWithResult(ContainerID cid, MoveResult mr){
+    if(inflightMoveFuture.containsKey(cid)) {
+      inflightMoveFuture.get(cid).complete(mr);
+      inflightMoveFuture.remove(cid);
+    }
+  }
+}
+
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index de33120..3821575 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -361,6 +361,8 @@ public class SCMHAManagerImpl implements SCMHAManager {
     scm.getContainerManager().reinitialize(metadataStore.getContainerTable());
     scm.getScmBlockManager().getDeletedBlockLog().reinitialize(
         metadataStore.getDeletedBlocksTXTable());
+    scm.getReplicationManager().getMoveScheduler()
+        .reinitialize(metadataStore.getMoveTable());
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
       if (scm.getRootCertificateServer() != null) {
         scm.getRootCertificateServer().reinitialize(metadataStore);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
new file mode 100644
index 0000000..24601a5
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.MoveDataNodePairProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+
+/**
+ * Codec to serialize / deserialize MoveDataNodePair.
+ */
+
+public class MoveDataNodePairCodec implements Codec<MoveDataNodePair> {
+  @Override
+  public byte[] toPersistedFormat(MoveDataNodePair mdnp)
+      throws IOException {
+    return mdnp.getProtobufMessage(CURRENT_VERSION).toByteArray();
+  }
+
+  @Override
+  public MoveDataNodePair fromPersistedFormat(byte[] rawData)
+      throws IOException {
+    MoveDataNodePairProto.Builder builder =
+        MoveDataNodePairProto.newBuilder(
+            MoveDataNodePairProto.PARSER.parseFrom(rawData));
+    return MoveDataNodePair.getFromProtobuf(builder.build());
+  }
+
+  @Override
+  public MoveDataNodePair copyObject(MoveDataNodePair object) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index 7da36d1..b3e861b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfoCodec;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -143,6 +144,16 @@ public class SCMDBDefinition implements DBDefinition {
           Long.class,
           new LongCodec());
 
+  public static final DBColumnFamilyDefinition<ContainerID,
+      MoveDataNodePair>
+      MOVE =
+      new DBColumnFamilyDefinition<>(
+          "move",
+          ContainerID.class,
+          new ContainerIDCodec(),
+          MoveDataNodePair.class,
+          new MoveDataNodePairCodec());
+
   @Override
   public String getName() {
     return "scm.db";
@@ -157,6 +168,6 @@ public class SCMDBDefinition implements DBDefinition {
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
         VALID_SCM_CERTS, REVOKED_CERTS, REVOKED_CERTS_V2, PIPELINES, 
CONTAINERS,
-        TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID};
+        TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE};
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index 170fdde..799e128 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
 import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -43,6 +44,7 @@ import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRLS;
 import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRL_SEQUENCE_ID;
 import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
 import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
 import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS_V2;
@@ -84,6 +86,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore 
{
 
   private Table<String, Long> sequenceIdTable;
 
+  private Table<ContainerID, MoveDataNodePair> moveTable;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
   private DBStore store;
@@ -166,6 +170,10 @@ public class SCMMetadataStoreImpl implements 
SCMMetadataStore {
       sequenceIdTable = SEQUENCE_ID.getTable(store);
 
       checkTableStatus(sequenceIdTable, SEQUENCE_ID.getName());
+
+      moveTable = MOVE.getTable(store);
+
+      checkTableStatus(moveTable, MOVE.getName());
     }
   }
 
@@ -266,6 +274,11 @@ public class SCMMetadataStoreImpl implements 
SCMMetadataStore {
     return sequenceIdTable;
   }
 
+  @Override
+  public Table<ContainerID, MoveDataNodePair> getMoveTable() {
+    return moveTable;
+  }
+
   private void checkTableStatus(Table table, String name) throws IOException {
     String logMessage = "Unable to get a reference to %s table. Cannot " +
         "continue.";
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index d268f2c..e9d439e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -606,7 +606,9 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
           scmContext,
           serviceManager,
           scmNodeManager,
-          new MonotonicClock(ZoneOffset.UTC));
+          new MonotonicClock(ZoneOffset.UTC),
+          scmHAManager,
+          getScmMetadataStore().getMoveTable());
     }
     if(configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d7fcde7..631ab2b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.primitives.Longs;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -31,18 +32,24 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.container.ReplicationManager
     .ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.TestClock;
@@ -52,6 +59,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -95,24 +103,27 @@ public class TestReplicationManager {
   private DatanodeCommandHandler datanodeCommandHandler;
   private SimpleMockNodeManager nodeManager;
   private ContainerManagerV2 containerManager;
-  private OzoneConfiguration conf;
-  private SCMNodeManager scmNodeManager;
   private GenericTestUtils.LogCapturer scmLogs;
+  private SCMServiceManager serviceManager;
   private TestClock clock;
+  private File testDir;
+  private DBStore dbStore;
 
   @Before
   public void setup()
       throws IOException, InterruptedException, NodeNotFoundException {
-    conf = new OzoneConfiguration();
+    OzoneConfiguration conf = new OzoneConfiguration();
     conf.setTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         0, TimeUnit.SECONDS);
 
-    scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
+    scmLogs = GenericTestUtils.LogCapturer
+      .captureLogs(ReplicationManager.LOG);
     containerManager = Mockito.mock(ContainerManagerV2.class);
     nodeManager = new SimpleMockNodeManager();
     eventQueue = new EventQueue();
     containerStateManager = new ContainerStateManager(conf);
+    serviceManager = new SCMServiceManager();
 
     datanodeCommandHandler = new DatanodeCommandHandler();
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
@@ -152,28 +163,29 @@ public class TestReplicationManager {
     Mockito.when(containerPlacementPolicy.validateContainerPlacement(
         Mockito.any(),
         Mockito.anyInt()
-        )).thenAnswer(invocation ->  {
-          return new ContainerPlacementStatusDefault(2, 2, 3);
-        });
-
-    scmNodeManager = Mockito.mock(SCMNodeManager.class);
-    Mockito.when(scmNodeManager.getNodeStatus(
-        Mockito.any(DatanodeDetails.class)))
-        .thenReturn(NodeStatus.inServiceHealthy());
-
+        )).thenAnswer(invocation ->
+        new ContainerPlacementStatusDefault(2, 2, 3));
     clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
     createReplicationManager(new ReplicationManagerConfiguration());
   }
 
   private void createReplicationManager(ReplicationManagerConfiguration rmConf)
-      throws InterruptedException {
+      throws InterruptedException, IOException {
     OzoneConfiguration config = new OzoneConfiguration();
+    testDir = GenericTestUtils
+      .getTestDir(TestSCMContainerManager.class.getSimpleName());
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
     config.setTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         0, TimeUnit.SECONDS);
     config.setFromObject(rmConf);
 
-    SCMServiceManager serviceManager = new SCMServiceManager();
+    SCMHAManager scmHAManager = MockSCMHAManager
+        .getInstance(true, new SCMDBTransactionBufferImpl());
+    dbStore = DBStoreBuilder.createDBStore(
+      config, new SCMDBDefinition());
+
     replicationManager = new ReplicationManager(
         config,
         containerManager,
@@ -182,7 +194,9 @@ public class TestReplicationManager {
         SCMContext.emptyContext(),
         serviceManager,
         nodeManager,
-        clock);
+        clock,
+        scmHAManager,
+        SCMDBDefinition.MOVE.getTable(dbStore));
 
     serviceManager.notifyStatusChanged();
     scmLogs.clearOutput();
@@ -998,7 +1012,7 @@ public class TestReplicationManager {
 
   @Test
   public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -1046,7 +1060,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedDueToDecommission() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1060,7 +1074,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedDueToAllDecommission() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1074,7 +1088,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testCorrectlyReplicatedWithDecommission() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1089,7 +1103,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedDueToMaintenance() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1102,12 +1116,13 @@ public class TestReplicationManager {
    * min replica for maintenance is 1 and another replica is available.
    */
   @Test
-  public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testNotUnderReplicatedDueToMaintenanceMinRepOne()
+      throws Exception {
     replicationManager.stop();
     ReplicationManagerConfiguration newConf =
         new ReplicationManagerConfiguration();
     newConf.setMaintenanceReplicaMinimum(1);
+    dbStore.close();
     createReplicationManager(newConf);
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1121,12 +1136,13 @@ public class TestReplicationManager {
    * are going off line and min rep is 1.
    */
   @Test
-  public void testUnderReplicatedDueToMaintenanceMinRepOne() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testUnderReplicatedDueToMaintenanceMinRepOne()
+      throws Exception {
     replicationManager.stop();
     ReplicationManagerConfiguration newConf =
         new ReplicationManagerConfiguration();
     newConf.setMaintenanceReplicaMinimum(1);
+    dbStore.close();
     createReplicationManager(newConf);
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1141,7 +1157,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedDueToAllMaintenance() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1155,7 +1171,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testCorrectlyReplicatedWithMaintenance() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1170,7 +1186,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedWithDecommissionAndMaintenance() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+      SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1187,7 +1203,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testOverReplicatedClosedContainerWithDecomAndMaint()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+      throws SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1231,7 +1247,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedNotHealthySource()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+      throws SCMException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, NodeStatus.inServiceStale(), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
@@ -1257,7 +1273,8 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
     Thread.sleep(100L);
@@ -1284,6 +1301,100 @@ public class TestReplicationManager {
   }
 
   /**
+   * if crash happened and restarted, move option should work as expected.
+   */
+  @Test
+  public void testMoveCrashAndRestart() throws IOException,
+      NodeNotFoundException, InterruptedException {
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    ContainerID id = container.containerID();
+    ContainerReplica dn1 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+    replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    Assert.assertTrue(scmLogs.getOutput().contains(
+        "receive a move request about container"));
+    Thread.sleep(100L);
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.replicateContainerCommand, dn3));
+    Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.replicateContainerCommand));
+
+    //crash happens, restart scm.
+    //clear current inflight actions and reload inflightMove from DBStore.
+    resetReplicationManager();
+    replicationManager.getMoveScheduler()
+        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+    Assert.assertTrue(replicationManager.getMoveScheduler()
+        .getInflightMove().containsKey(id));
+    MoveDataNodePair kv = replicationManager.getMoveScheduler()
+        .getInflightMove().get(id);
+    Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
+    Assert.assertEquals(kv.getTgt(), dn3);
+    serviceManager.notifyStatusChanged();
+
+    Thread.sleep(100L);
+    // now, the container is not over-replicated,
+    // so no deleteContainerCommand will be sent
+    Assert.assertFalse(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand, 
dn1.getDatanodeDetails()));
+    //replica does not exist in target datanode, so a replicateContainerCommand
+    //will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
+    Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.replicateContainerCommand));
+
+
+    //replicate container to dn3, now, over-replicated
+    addReplicaToDn(container, dn3, CLOSED);
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+
+    //deleteContainerCommand is sent, but the src replica is not deleted now
+    Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.deleteContainerCommand));
+
+    //crash happens, restart scm.
+    //clear current inflight actions and reload inflightMove from DBStore.
+    resetReplicationManager();
+    replicationManager.getMoveScheduler()
+        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+    Assert.assertTrue(replicationManager.getMoveScheduler()
+        .getInflightMove().containsKey(id));
+    kv = replicationManager.getMoveScheduler()
+        .getInflightMove().get(id);
+    Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
+    Assert.assertEquals(kv.getTgt(), dn3);
+    serviceManager.notifyStatusChanged();
+
+    //after restart and the container is over-replicated now,
+    //deleteContainerCommand will be sent again
+    Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.deleteContainerCommand));
+    containerStateManager.removeContainerReplica(id, dn1);
+
+    //replica in src datanode is deleted now
+    containerStateManager.removeContainerReplica(id, dn1);
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+
+    //since the move is complete,so after scm crash and restart
+    //inflightMove should not contain the container again
+    resetReplicationManager();
+    replicationManager.getMoveScheduler()
+        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+    Assert.assertFalse(replicationManager.getMoveScheduler()
+        .getInflightMove().containsKey(id));
+
+    //completeableFuture is not stored in DB, so after scm crash and
+    //restart ,completeableFuture is missing
+  }
+
+  /**
    * make sure RM does not delete replica if placement policy is not satisfied.
    */
   @Test
@@ -1300,7 +1411,8 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
+        replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn4));
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
     Thread.sleep(100L);
@@ -1341,7 +1453,8 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
 
@@ -1353,7 +1466,8 @@ public class TestReplicationManager {
         MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
 
     nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     addReplicaToDn(container, dn3, CLOSED);
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -1384,13 +1498,18 @@ public class TestReplicationManager {
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     ContainerReplica dn4 = addReplica(container,
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-
+    
     CompletableFuture<MoveResult> cf;
+    //the above move is executed successfully, so there may be some item in
+    //inflightReplication or inflightDeletion. here we stop replication manager
+    //to clear these states, which may impact the tests below.
+    //we don't need a running replicationManamger now
     replicationManager.stop();
     Thread.sleep(100L);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     Assert.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.RM_NOT_RUNNING);
+        MoveResult.FAIL_NOT_RUNNING);
     replicationManager.start();
     Thread.sleep(100L);
 
@@ -1398,7 +1517,8 @@ public class TestReplicationManager {
     for (LifeCycleState state : LifeCycleState.values()) {
       if (state != LifeCycleState.CLOSED) {
         container.setState(state);
-        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        cf = replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
       }
@@ -1410,10 +1530,12 @@ public class TestReplicationManager {
       if (state != HEALTHY) {
         nodeManager.setNodeStatus(dn3,
             new NodeStatus(IN_SERVICE, state));
-        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        cf = replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+        cf = replicationManager.move(id,
+            new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
       }
@@ -1426,10 +1548,12 @@ public class TestReplicationManager {
       if (state != IN_SERVICE) {
         nodeManager.setNodeStatus(dn3,
             new NodeStatus(state, HEALTHY));
-        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        cf = replicationManager.move(id,
+            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
-        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+        cf = replicationManager.move(id,
+            new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
       }
@@ -1437,13 +1561,14 @@ public class TestReplicationManager {
     nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
 
     //container exists in target datanode
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(),
-        dn2.getDatanodeDetails());
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(),
+        dn2.getDatanodeDetails()));
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
 
     //container does not exist in source datanode
-    cf = replicationManager.move(id, dn3, dn3);
+    cf = replicationManager.move(id, new MoveDataNodePair(dn3, dn3));
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
 
@@ -1454,7 +1579,8 @@ public class TestReplicationManager {
     replicationManager.processAll();
     //waiting for inflightDeletion generation
     eventQueue.processAll(1000);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
     resetReplicationManager();
@@ -1467,7 +1593,8 @@ public class TestReplicationManager {
     replicationManager.processAll();
     //waiting for inflightReplication generation
     eventQueue.processAll(1000);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
   }
@@ -1561,7 +1688,7 @@ public class TestReplicationManager {
     return replica;
   }
 
-  private void assertReplicaScheduled(int delta) throws InterruptedException {
+  private void assertReplicaScheduled(int delta) {
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
@@ -1588,9 +1715,11 @@ public class TestReplicationManager {
   }
 
   @After
-  public void teardown() throws IOException {
+  public void teardown() throws Exception {
     containerStateManager.close();
     replicationManager.stop();
+    dbStore.close();
+    FileUtils.deleteDirectory(testDir);
   }
 
   private static class DatanodeCommandHandler implements
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index af762d6..80b7240 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.*;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index c666ef8..fb13d05 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
@@ -149,6 +150,11 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
     return null;
   }
 
+  @Override
+  public Table<ContainerID, MoveDataNodePair> getMoveTable() {
+    return null;
+  }
+
   /**
    * Test SCM DB Definition for the above class.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 26afe7e..03bc8ba 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -750,10 +750,9 @@ public class TestStorageContainerManager {
           new TestStorageContainerManagerHelper(cluster, conf);
 
       helper.createKeys(10, 4096);
-      GenericTestUtils.waitFor(() -> {
-        return cluster.getStorageContainerManager().getContainerManager().
-            getContainers() != null;
-      }, 1000, 10000);
+      GenericTestUtils.waitFor(() ->
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainers() != null, 1000, 10000);
 
       StorageContainerManager scm = cluster.getStorageContainerManager();
       List<ContainerInfo> containers = cluster.getStorageContainerManager()
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b8ad7d5..1607d92 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -17,7 +17,10 @@
 package org.apache.hadoop.ozone.scm;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -36,10 +39,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.event.Level;
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+
 /**
  * Tests failover with SCM HA setup.
  */
@@ -136,6 +144,89 @@ public class TestFailoverWithSCMHA {
         .contains("Performing failover to suggested leader"));
   }
 
+  @Test
+  public void testMoveFailover() throws Exception {
+    SCMClientConfig scmClientConfig =
+        conf.getObject(SCMClientConfig.class);
+    scmClientConfig.setRetryCount(1);
+    scmClientConfig.setRetryInterval(100);
+    scmClientConfig.setMaxRetryTimeout(1500);
+    Assert.assertEquals(scmClientConfig.getRetryCount(), 15);
+    conf.setFromObject(scmClientConfig);
+    StorageContainerManager scm = getLeader(cluster);
+    Assert.assertNotNull(scm);
+
+    final ContainerID id =
+        getContainer(HddsProtos.LifeCycleState.CLOSED).containerID();
+    DatanodeDetails dn1 = randomDatanodeDetails();
+    DatanodeDetails dn2 = randomDatanodeDetails();
+
+    //here we just want to test whether the new leader will get the same
+    //inflight move after failover, so no need to create container and 
datanode,
+    //just mock them bypassing all the pre checks.
+    scm.getReplicationManager().getMoveScheduler().startMove(id.getProtobuf(),
+        (new MoveDataNodePair(dn1, dn2)).getProtobufMessage(CURRENT_VERSION));
+
+    SCMBlockLocationFailoverProxyProvider failoverProxyProvider =
+        new SCMBlockLocationFailoverProxyProvider(conf);
+    failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+    ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+        new ScmBlockLocationProtocolClientSideTranslatorPB(
+            failoverProxyProvider);
+    GenericTestUtils
+        .setLogLevel(SCMBlockLocationFailoverProxyProvider.LOG, Level.DEBUG);
+    GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
+        .captureLogs(SCMBlockLocationFailoverProxyProvider.LOG);
+    ScmBlockLocationProtocol scmBlockLocationProtocol = TracingUtil
+        .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
+            conf);
+    scmBlockLocationProtocol.getScmInfo();
+    Assert.assertTrue(logCapture.getOutput()
+        .contains("Performing failover to suggested leader"));
+    scm = getLeader(cluster);
+    Assert.assertNotNull(scm);
+
+    //switch to the new leader successfully, new leader should
+    //get the same inflightMove
+    Map<ContainerID, MoveDataNodePair> inflightMove =
+        scm.getReplicationManager().getMoveScheduler().getInflightMove();
+    Assert.assertTrue(inflightMove.containsKey(id));
+    MoveDataNodePair mp = inflightMove.get(id);
+    Assert.assertTrue(dn2.equals(mp.getTgt()));
+    Assert.assertTrue(dn1.equals(mp.getSrc()));
+
+    //complete move in the new leader
+    scm.getReplicationManager().getMoveScheduler()
+        .completeMove(id.getProtobuf());
+
+
+    SCMContainerLocationFailoverProxyProvider proxyProvider =
+        new SCMContainerLocationFailoverProxyProvider(conf, null);
+    GenericTestUtils.setLogLevel(SCMContainerLocationFailoverProxyProvider.LOG,
+        Level.DEBUG);
+    logCapture = GenericTestUtils.LogCapturer
+        .captureLogs(SCMContainerLocationFailoverProxyProvider.LOG);
+    proxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+    StorageContainerLocationProtocol scmContainerClient =
+        TracingUtil.createProxy(
+            new StorageContainerLocationProtocolClientSideTranslatorPB(
+                proxyProvider), StorageContainerLocationProtocol.class, conf);
+
+    scmContainerClient.allocateContainer(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, "ozone");
+    Assert.assertTrue(logCapture.getOutput()
+        .contains("Performing failover to suggested leader"));
+
+    //switch to the new leader successfully, new leader should
+    //get the same inflightMove , which should not contains
+    //that container.
+    scm = getLeader(cluster);
+    Assert.assertNotNull(scm);
+    inflightMove = scm.getReplicationManager()
+        .getMoveScheduler().getInflightMove();
+    Assert.assertFalse(inflightMove.containsKey(id));
+  }
+
   static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
     for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
       if (scm.checkLeader()) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 8dd9b66..7b46941 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -514,7 +514,8 @@ public class TestDecommissionAndMaintenance {
     waitForContainerReplicas(newContainer, 3);
 
     ContainerReplicaCount counts =
-        scm.getReplicationManager().getContainerReplicaCount(newContainer);
+        scm.getReplicationManager()
+          .getContainerReplicaCount(newContainer.containerID());
     assertEquals(1, counts.getMaintenanceCount());
     assertTrue(counts.isSufficientlyReplicated());
 
@@ -541,7 +542,8 @@ public class TestDecommissionAndMaintenance {
     waitForContainerReplicas(nextContainer, 3);
     // There should be no IN_MAINTENANCE node:
     assertEquals(0, nm.getNodeCount(IN_MAINTENANCE, null));
-    counts = 
scm.getReplicationManager().getContainerReplicaCount(newContainer);
+    counts = scm.getReplicationManager()
+      .getContainerReplicaCount(newContainer.containerID());
     assertEquals(0, counts.getMaintenanceCount());
     assertTrue(counts.isSufficientlyReplicated());
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to