This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ccc128  HDDS-4928. Support container move in Replication Manager 
(#2349)
4ccc128 is described below

commit 4ccc1281389344cb660cdb1d8d8119addb2f770c
Author: Jackson Yao <[email protected]>
AuthorDate: Mon Jul 19 14:18:18 2021 +0800

    HDDS-4928. Support container move in Replication Manager (#2349)
---
 .../hdds/scm/container/ReplicationManager.java     | 485 +++++++++++++++++++--
 .../hdds/scm/container/TestReplicationManager.java | 278 +++++++++++-
 2 files changed, 712 insertions(+), 51 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 2903653..d647055 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -40,6 +41,8 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -50,7 +53,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -136,6 +140,64 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   private final Map<ContainerID, List<InflightAction>> inflightDeletion;
 
   /**
+   * This is used for tracking container move commands
+   * which are not yet complete.
+   */
+  private final Map<ContainerID,
+      Pair<DatanodeDetails, DatanodeDetails>> inflightMove;
+
+  /**
+   * This is used for indicating the result of move option and
+   * the corresponding reason. this is useful for tracking
+   * the result of move option
+   */
+  enum MoveResult {
+    // both replication and deletion are completed
+    COMPLETED,
+    // RM is not running
+    RM_NOT_RUNNING,
+    // replication fail because the container does not exist in src
+    REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
+    // replication fail because the container exists in target
+    REPLICATION_FAIL_EXIST_IN_TARGET,
+    // replication fail because the container is not cloesed
+    REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
+    // replication fail because the container is in inflightDeletion
+    REPLICATION_FAIL_INFLIGHT_DELETION,
+    // replication fail because the container is in inflightReplication
+    REPLICATION_FAIL_INFLIGHT_REPLICATION,
+    // replication fail because of timeout
+    REPLICATION_FAIL_TIME_OUT,
+    // replication fail because of node is not in service
+    REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication fail because node is unhealthy
+    REPLICATION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but deletion fail because of timeout
+    DELETION_FAIL_TIME_OUT,
+    // replication succeed, but deletion fail because because
+    // node is unhealthy
+    DELETION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but if we delete the container from
+    // the source datanode , the policy(eg, replica num or
+    // rack location) will not be satisfied, so we should not delete
+    // the container
+    DELETE_FAIL_POLICY,
+    //  replicas + target - src does not satisfy placement policy
+    PLACEMENT_POLICY_NOT_SATISFIED,
+    //unexpected action, remove src at inflightReplication
+    UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION,
+    //unexpected action, remove target at inflightDeletion
+    UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION
+  }
+
+  /**
+   * This is used for tracking container move commands
+   * which are not yet complete.
+   */
+  private final Map<ContainerID,
+      CompletableFuture<MoveResult>> inflightMoveFuture;
+
+  /**
    * ReplicationManager specific configuration.
    */
   private final ReplicationManagerConfiguration rmConf;
@@ -194,6 +256,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     this.running = false;
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
+    this.inflightMove = new ConcurrentHashMap<>();
+    this.inflightMoveFuture = new ConcurrentHashMap<>();
     this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
     this.clock = clock;
 
@@ -214,7 +278,6 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
    */
   @Override
   public synchronized void start() {
-
     if (!isRunning()) {
       DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
           "SCM Replication manager (closed container replication) related "
@@ -264,6 +327,9 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
       LOG.info("Stopping Replication Monitor Thread.");
       inflightReplication.clear();
       inflightDeletion.clear();
+      //TODO: replicate inflight move through ratis
+      inflightMove.clear();
+      inflightMoveFuture.clear();
       running = false;
       DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
       notifyAll();
@@ -458,13 +524,17 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
         try {
           InflightAction a = iter.next();
           NodeStatus status = nodeManager.getNodeStatus(a.datanode);
-          NodeState state = status.getHealth();
-          NodeOperationalState opState = status.getOperationalState();
-          if (state != NodeState.HEALTHY || a.time < deadline ||
-              filter.test(a) || opState != NodeOperationalState.IN_SERVICE) {
+          boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
+          boolean isCompleted = filter.test(a);
+          boolean isTimeout = a.time < deadline;
+          boolean isNotInService = status.getOperationalState() !=
+              NodeOperationalState.IN_SERVICE;
+          if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
             iter.remove();
+            updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
+                container, a.datanode, inflightActions);
           }
-        } catch (NodeNotFoundException e) {
+        } catch (NodeNotFoundException | ContainerNotFoundException e) {
           // Should not happen, but if it does, just remove the action as the
           // node somehow does not exist;
           iter.remove();
@@ -477,6 +547,247 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   }
 
   /**
+   * update inflight move if needed.
+   *
+   * @param isUnhealthy is the datanode unhealthy
+   * @param isCompleted is the action completed
+   * @param isTimeout is the action timeout
+   * @param container Container to update
+   * @param dn datanode which is removed from the inflightActions
+   * @param inflightActions inflightReplication (or) inflightDeletion
+   */
+  private void updateMoveIfNeeded(final boolean isUnhealthy,
+                   final boolean isCompleted, final boolean isTimeout,
+                   final ContainerInfo container, final DatanodeDetails dn,
+                   final Map<ContainerID,
+                       List<InflightAction>> inflightActions)
+      throws ContainerNotFoundException {
+    // make sure inflightMove contains the container
+    ContainerID id = container.containerID();
+    if (!inflightMove.containsKey(id)) {
+      return;
+    }
+
+    // make sure the datanode , which is removed from inflightActions,
+    // is source or target datanode.
+    Pair<DatanodeDetails, DatanodeDetails> kv = inflightMove.get(id);
+    final boolean isSource = kv.getKey().equals(dn);
+    final boolean isTarget = kv.getValue().equals(dn);
+    if (!isSource && !isTarget) {
+      return;
+    }
+    final boolean isInflightReplication =
+        inflightActions.equals(inflightReplication);
+
+    /*
+     * there are some case:
+     **********************************************************
+     *              * InflightReplication  * InflightDeletion *
+     **********************************************************
+     *source removed*     unexpected       *    expected      *
+     **********************************************************
+     *target removed*      expected        *   unexpected     *
+     **********************************************************
+     * unexpected action may happen somehow. to make it deterministic,
+     * if unexpected action happens, we just fail the completableFuture.
+     */
+
+    if (isSource && isInflightReplication) {
+      inflightMoveFuture.get(id).complete(
+          MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION);
+      inflightMove.remove(id);
+      inflightMoveFuture.remove(id);
+      return;
+    }
+
+    if (isTarget && !isInflightReplication) {
+      inflightMoveFuture.get(id).complete(
+          MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION);
+      inflightMove.remove(id);
+      inflightMoveFuture.remove(id);
+      return;
+    }
+
+    if (!(isInflightReplication && isCompleted)) {
+      if (isInflightReplication) {
+        if (isUnhealthy) {
+          inflightMoveFuture.get(id).complete(
+              MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        } else {
+          inflightMoveFuture.get(id).complete(
+              MoveResult.REPLICATION_FAIL_TIME_OUT);
+        }
+      } else {
+        if (isUnhealthy) {
+          inflightMoveFuture.get(id).complete(
+              MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+        } else if (isTimeout) {
+          inflightMoveFuture.get(id).complete(
+              MoveResult.DELETION_FAIL_TIME_OUT);
+        } else {
+          inflightMoveFuture.get(id).complete(
+              MoveResult.COMPLETED);
+        }
+      }
+      inflightMove.remove(id);
+      inflightMoveFuture.remove(id);
+    } else {
+      deleteSrcDnForMove(container,
+          containerManager.getContainerReplicas(id));
+    }
+  }
+
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public CompletableFuture<MoveResult> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+    if (!isRunning()) {
+      ret.complete(MoveResult.RM_NOT_RUNNING);
+      return ret;
+    }
+
+    /*
+     * make sure the flowing conditions are met:
+     *  1 the given two datanodes are in healthy state
+     *  2 the given container exists on the given source datanode
+     *  3 the given container does not exist on the given target datanode
+     *  4 the given container is in closed state
+     *  5 the giver container is not taking any inflight action
+     *  6 the given two datanodes are in IN_SERVICE state
+     *  7 {Existing replicas + Target_Dn - Source_Dn} satisfies
+     *     the placement policy
+     *
+     * move is a combination of two steps : replication and deletion.
+     * if the conditions above are all met, then we take a conservative
+     * strategy here : replication can always be executed, but the execution
+     * of deletion always depends on placement policy
+     */
+
+    NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+    NodeState healthStat = currentNodeStat.getHealth();
+    NodeOperationalState operationalState =
+        currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      return ret;
+    }
+    if (operationalState != NodeOperationalState.IN_SERVICE) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      return ret;
+    }
+
+    currentNodeStat = nodeManager.getNodeStatus(targetDn);
+    healthStat = currentNodeStat.getHealth();
+    operationalState = currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      return ret;
+    }
+    if (operationalState != NodeOperationalState.IN_SERVICE) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      return ret;
+    }
+
+    // we need to synchronize on ContainerInfo, since it is
+    // shared by ICR/FCR handler and this.processContainer
+    // TODO: use a Read lock after introducing a RW lock into ContainerInfo
+    ContainerInfo cif = containerManager.getContainer(cid);
+    synchronized (cif) {
+      final Set<ContainerReplica> currentReplicas = containerManager
+          .getContainerReplicas(cid);
+      final Set<DatanodeDetails> replicas = currentReplicas.stream()
+            .map(ContainerReplica::getDatanodeDetails)
+            .collect(Collectors.toSet());
+      if (replicas.contains(targetDn)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+      if (!replicas.contains(srcDn)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+        return ret;
+      }
+
+      /*
+      * the reason why the given container should not be taking any inflight
+      * action is that: if the given container is being replicated or deleted,
+      * the num of its replica is not deterministic, so move operation issued
+      * by balancer may cause a nondeterministic result, so we should drop
+      * this option for this time.
+      * */
+
+      if (inflightReplication.containsKey(cid)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      }
+      if (inflightDeletion.containsKey(cid)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+
+      /*
+      * here, no need to see whether cid is in inflightMove, because
+      * these three map are all synchronized on ContainerInfo, if cid
+      * is in infligtMove , it must now being replicated or deleted,
+      * so it must be in inflightReplication or in infligthDeletion.
+      * thus, if we can not find cid in both of them , this cid must
+      * not be in inflightMove.
+      */
+
+      LifeCycleState currentContainerStat = cif.getState();
+      if (currentContainerStat != LifeCycleState.CLOSED) {
+        ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+        return ret;
+      }
+
+      // check whether {Existing replicas + Target_Dn - Source_Dn}
+      // satisfies current placement policy
+      if (!isPolicySatisfiedAfterMove(cif, srcDn, targetDn,
+          currentReplicas.stream().collect(Collectors.toList()))) {
+        ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED);
+        return ret;
+      }
+
+      inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn));
+      inflightMoveFuture.putIfAbsent(cid, ret);
+      sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn));
+    }
+    LOG.info("receive a move request about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());
+    return ret;
+  }
+
+  /**
+   * Returns whether {Existing replicas + Target_Dn - Source_Dn}
+   * satisfies current placement policy.
+   * @param cif Container Info of moved container
+   * @param srcDn DatanodeDetails of source data node
+   * @param targetDn DatanodeDetails of target data node
+   * @param replicas container replicas
+   * @return whether the placement policy is satisfied after move
+   */
+  private boolean isPolicySatisfiedAfterMove(ContainerInfo cif,
+                    DatanodeDetails srcDn, DatanodeDetails targetDn,
+                    final List<ContainerReplica> replicas){
+    Set<ContainerReplica> movedReplicas =
+        replicas.stream().collect(Collectors.toSet());
+    movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+    movedReplicas.add(ContainerReplica.newBuilder()
+        .setDatanodeDetails(targetDn)
+        .setContainerID(cif.containerID())
+        .setContainerState(State.CLOSED).build());
+    ContainerPlacementStatus placementStatus = getPlacementStatus(
+        movedReplicas, cif.getReplicationConfig().getRequiredNodes());
+    return placementStatus.isPolicySatisfied();
+  }
+
+  /**
    * Returns the number replica which are pending creation for the given
    * container ID.
    * @param id The ContainerID for which to check the pending replica
@@ -864,48 +1175,130 @@ public class ReplicationManager implements 
MetricsSource, SCMService {
           break;
         }
       }
-      // After removing all unhealthy replicas, if the container is still over
-      // replicated then we need to check if it is already mis-replicated.
-      // If it is, we do no harm by removing excess replicas. However, if it is
-      // not mis-replicated, then we can only remove replicas if they don't
-      // make the container become mis-replicated.
-      if (excess > 0) {
-        eligibleReplicas.removeAll(unhealthyReplicas);
-        Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
-        ContainerPlacementStatus ps =
-            getPlacementStatus(eligibleSet, replicationFactor);
-        for (ContainerReplica r : eligibleReplicas) {
-          if (excess <= 0) {
-            break;
-          }
-          // First remove the replica we are working on from the set, and then
-          // check if the set is now mis-replicated.
-          eligibleSet.remove(r);
-          ContainerPlacementStatus nowPS =
-              getPlacementStatus(eligibleSet, replicationFactor);
-          if ((!ps.isPolicySatisfied()
-                && nowPS.actualPlacementCount() == ps.actualPlacementCount())
-              || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) {
-            // Remove the replica if the container was already unsatisfied
-            // and losing this replica keep actual placement count unchanged.
-            // OR if losing this replica still keep satisfied
-            sendDeleteCommand(container, r.getDatanodeDetails(), true);
-            excess -= 1;
-            continue;
-          }
-          // If we decided not to remove this replica, put it back into the set
-          eligibleSet.add(r);
+      eligibleReplicas.removeAll(unhealthyReplicas);
+      removeExcessReplicasIfNeeded(excess, container, eligibleReplicas);
+    }
+  }
+
+  /**
+   * if the container is in inflightMove, handle move.
+   * This function assumes replication has been completed
+   *
+   * @param cif ContainerInfo
+   * @param replicaSet An Set of replicas, which may have excess replicas
+   */
+  private void deleteSrcDnForMove(final ContainerInfo cif,
+                   final Set<ContainerReplica> replicaSet) {
+    final ContainerID cid = cif.containerID();
+    if (inflightMove.containsKey(cid)) {
+      Pair<DatanodeDetails, DatanodeDetails> movePair =
+          inflightMove.get(cid);
+      final DatanodeDetails srcDn = movePair.getKey();
+      ContainerReplicaCount replicaCount =
+          getContainerReplicaCount(cif, replicaSet);
+
+      if(!replicaSet.stream()
+          .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
+        // if the target is present but source disappears somehow,
+        // we can consider move is successful.
+        inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED);
+        inflightMove.remove(cid);
+        inflightMoveFuture.remove(cid);
+        return;
+      }
+
+      int replicationFactor =
+          cif.getReplicationConfig().getRequiredNodes();
+      ContainerPlacementStatus currentCPS =
+          getPlacementStatus(replicaSet, replicationFactor);
+      Set<ContainerReplica> newReplicaSet = replicaSet.
+          stream().collect(Collectors.toSet());
+      newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+      ContainerPlacementStatus newCPS =
+          getPlacementStatus(newReplicaSet, replicationFactor);
+
+      if (replicaCount.isOverReplicated() &&
+          isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
+        sendDeleteCommand(cif, srcDn, true);
+      } else {
+        // if source and target datanode are both in the replicaset,
+        // but we can not delete source datanode for now (e.g.,
+        // there is only 3 replicas or not policy-statisfied , etc.),
+        // we just complete the future without sending a delete command.
+        LOG.info("can not remove source replica after successfully " +
+            "replicated to target datanode");
+        inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY);
+        inflightMove.remove(cid);
+        inflightMoveFuture.remove(cid);
+      }
+    }
+  }
+
+  /**
+   * remove execess replicas if needed, replicationFactor and placement policy
+   * will be take into consideration.
+   *
+   * @param excess the excess number after subtracting replicationFactor
+   * @param container ContainerInfo
+   * @param eligibleReplicas An list of replicas, which may have excess 
replicas
+   */
+  private void removeExcessReplicasIfNeeded(int excess,
+                    final ContainerInfo container,
+                    final List<ContainerReplica> eligibleReplicas) {
+    // After removing all unhealthy replicas, if the container is still over
+    // replicated then we need to check if it is already mis-replicated.
+    // If it is, we do no harm by removing excess replicas. However, if it is
+    // not mis-replicated, then we can only remove replicas if they don't
+    // make the container become mis-replicated.
+    if (excess > 0) {
+      Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
+      final int replicationFactor =
+          container.getReplicationConfig().getRequiredNodes();
+      ContainerPlacementStatus ps =
+          getPlacementStatus(eligibleSet, replicationFactor);
+
+      for (ContainerReplica r : eligibleReplicas) {
+        if (excess <= 0) {
+          break;
         }
-        if (excess > 0) {
-          LOG.info("The container {} is over replicated with {} excess " +
-              "replica. The excess replicas cannot be removed without " +
-              "violating the placement policy", container, excess);
+        // First remove the replica we are working on from the set, and then
+        // check if the set is now mis-replicated.
+        eligibleSet.remove(r);
+        ContainerPlacementStatus nowPS =
+            getPlacementStatus(eligibleSet, replicationFactor);
+        if (isPlacementStatusActuallyEqual(ps, nowPS)) {
+          // Remove the replica if the container was already unsatisfied
+          // and losing this replica keep actual placement count unchanged.
+          // OR if losing this replica still keep satisfied
+          sendDeleteCommand(container, r.getDatanodeDetails(), true);
+          excess -= 1;
+          continue;
         }
+        // If we decided not to remove this replica, put it back into the set
+        eligibleSet.add(r);
+      }
+      if (excess > 0) {
+        LOG.info("The container {} is over replicated with {} excess " +
+            "replica. The excess replicas cannot be removed without " +
+            "violating the placement policy", container, excess);
       }
     }
   }
 
   /**
+   * whether the given two ContainerPlacementStatus are actually equal.
+   *
+   * @param cps1 ContainerPlacementStatus
+   * @param cps2 ContainerPlacementStatus
+   */
+  private boolean isPlacementStatusActuallyEqual(
+                      ContainerPlacementStatus cps1,
+                      ContainerPlacementStatus cps2) {
+    return cps1.actualPlacementCount() == cps2.actualPlacementCount() ||
+        cps1.isPolicySatisfied() && cps2.isPolicySatisfied();
+  }
+
+  /**
    * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
    * and then check if the list meets the container placement policy.
    * @param replicas List of containerReplica
@@ -915,7 +1308,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   private ContainerPlacementStatus getPlacementStatus(
       Set<ContainerReplica> replicas, int replicationFactor) {
     List<DatanodeDetails> replicaDns = replicas.stream()
-        .map(c -> c.getDatanodeDetails()).collect(Collectors.toList());
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
     return containerPlacement.validateContainerPlacement(
         replicaDns, replicationFactor);
   }
@@ -1155,6 +1549,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
             inflightReplication.size())
         .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
             inflightDeletion.size())
+        .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE,
+            inflightMove.size())
         .endRecord();
   }
 
@@ -1250,7 +1646,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   public enum ReplicationManagerMetrics implements MetricsInfo {
 
     INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
-    INFLIGHT_DELETION("Tracked inflight container deletion requests.");
+    INFLIGHT_DELETION("Tracked inflight container deletion requests."),
+    INFLIGHT_MOVE("Tracked inflight container move requests.");
 
     private final String desc;
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 38b06e1..d01aafc 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Longs;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager
     .ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -60,6 +62,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -73,7 +77,8 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
 import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
 import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -92,6 +97,7 @@ public class TestReplicationManager {
   private ContainerManagerV2 containerManager;
   private OzoneConfiguration conf;
   private SCMNodeManager scmNodeManager;
+  private GenericTestUtils.LogCapturer scmLogs;
   private TestClock clock;
 
   @Before
@@ -102,6 +108,7 @@ public class TestReplicationManager {
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         0, TimeUnit.SECONDS);
 
+    scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
     containerManager = Mockito.mock(ContainerManagerV2.class);
     nodeManager = new SimpleMockNodeManager();
     eventQueue = new EventQueue();
@@ -178,6 +185,7 @@ public class TestReplicationManager {
         clock);
 
     serviceManager.notifyStatusChanged();
+    scmLogs.clearOutput();
     Thread.sleep(100L);
   }
 
@@ -641,7 +649,7 @@ public class TestReplicationManager {
       throws SCMException, ContainerNotFoundException, InterruptedException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
-    final Set<ContainerReplica> replicas = getReplicas(id, CLOSED,
+    final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
         randomDatanodeDetails(),
         randomDatanodeDetails(),
         randomDatanodeDetails());
@@ -1086,6 +1094,245 @@ public class TestReplicationManager {
     assertReplicaScheduled(0);
   }
 
+  /**
+   * if all the prerequisites are satisfied, move should work as expected.
+   */
+  @Test
+  public void testMove() throws SCMException, NodeNotFoundException,
+      InterruptedException, ExecutionException {
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    ContainerID id = container.containerID();
+    ContainerReplica dn1 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+    CompletableFuture<MoveResult> cf =
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    Assert.assertTrue(scmLogs.getOutput().contains(
+        "receive a move request about container"));
+    Thread.sleep(100L);
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.replicateContainerCommand, dn3));
+    Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.replicateContainerCommand));
+
+    //replicate container to dn3
+    addReplicaToDn(container, dn3, CLOSED);
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand, 
dn1.getDatanodeDetails()));
+    Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.deleteContainerCommand));
+    containerStateManager.removeContainerReplica(id, dn1);
+
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+
+    Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
+  }
+
+  /**
+   * make sure RM does not delete replica if placement policy is not satisfied.
+   */
+  @Test
+  public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
+      throws SCMException, NodeNotFoundException,
+      InterruptedException, ExecutionException {
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    ContainerID id = container.containerID();
+    ContainerReplica dn1 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    ContainerReplica dn2 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+    CompletableFuture<MoveResult> cf =
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
+    Assert.assertTrue(scmLogs.getOutput().contains(
+        "receive a move request about container"));
+    Thread.sleep(100L);
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.replicateContainerCommand, dn4));
+    Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+        SCMCommandProto.Type.replicateContainerCommand));
+
+    //replicate container to dn4
+    addReplicaToDn(container, dn4, CLOSED);
+    //now, replication succeeds, but replica in dn2 lost,
+    //and there are only tree replicas totally, so rm should
+    //not delete the replica on dn1
+    containerStateManager.removeContainerReplica(id, dn2);
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+
+    Assert.assertFalse(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand, 
dn1.getDatanodeDetails()));
+
+    Assert.assertTrue(cf.isDone() && cf.get() == 
MoveResult.DELETE_FAIL_POLICY);
+  }
+
+
+  /**
+   * test src and target datanode become unhealthy when moving.
+   */
+  @Test
+  public void testDnBecameUnhealthyWhenMoving() throws SCMException,
+      NodeNotFoundException, InterruptedException, ExecutionException {
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    ContainerID id = container.containerID();
+    ContainerReplica dn1 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+    CompletableFuture<MoveResult> cf =
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    Assert.assertTrue(scmLogs.getOutput().contains(
+        "receive a move request about container"));
+
+    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE));
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+
+    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    addReplicaToDn(container, dn3, CLOSED);
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+    nodeManager.setNodeStatus(dn1.getDatanodeDetails(),
+        new NodeStatus(IN_SERVICE, STALE));
+    replicationManager.processContainersNow();
+    Thread.sleep(100L);
+
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+  }
+
+  /**
+   * before Replication Manager generates a completablefuture for a move 
option,
+   * some Prerequisites should be satisfied.
+   */
+  @Test
+  public void testMovePrerequisites()
+      throws SCMException, NodeNotFoundException,
+      InterruptedException, ExecutionException {
+    //all conditions is met
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    ContainerID id = container.containerID();
+    ContainerReplica dn1 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    ContainerReplica dn2 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+    ContainerReplica dn4 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+
+    CompletableFuture<MoveResult> cf;
+    //the above move is executed successfully, so there may be some item in
+    //inflightReplication or inflightDeletion. here we stop replication manager
+    //to clear these states, which may impact the tests below.
+    //we don't need a running replicationManamger now
+    replicationManager.stop();
+    Thread.sleep(100L);
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.RM_NOT_RUNNING);
+    replicationManager.start();
+    Thread.sleep(100L);
+
+    //container in not in CLOSED state
+    for (LifeCycleState state : LifeCycleState.values()) {
+      if (state != LifeCycleState.CLOSED) {
+        container.setState(state);
+        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        Assert.assertTrue(cf.isDone() && cf.get() ==
+            MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      }
+    }
+    container.setState(LifeCycleState.CLOSED);
+
+    //Node is not in healthy state
+    for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
+      if (state != HEALTHY) {
+        nodeManager.setNodeStatus(dn3,
+            new NodeStatus(IN_SERVICE, state));
+        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        Assert.assertTrue(cf.isDone() && cf.get() ==
+            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+        Assert.assertTrue(cf.isDone() && cf.get() ==
+            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      }
+    }
+    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+
+    //Node is not in IN_SERVICE state
+    for (HddsProtos.NodeOperationalState state :
+        HddsProtos.NodeOperationalState.values()) {
+      if (state != IN_SERVICE) {
+        nodeManager.setNodeStatus(dn3,
+            new NodeStatus(state, HEALTHY));
+        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+        Assert.assertTrue(cf.isDone() && cf.get() ==
+            MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+        Assert.assertTrue(cf.isDone() && cf.get() ==
+            MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      }
+    }
+    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+
+    //container exists in target datanode
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(),
+        dn2.getDatanodeDetails());
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+
+    //container does not exist in source datanode
+    cf = replicationManager.move(id, dn3, dn3);
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+
+    replicationManager.start();
+    //make container over relplicated to test the
+    // case that container is in inflightDeletion
+    ContainerReplica dn5 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
+    ContainerReplica dn6 = addReplica(container,
+        new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
+    replicationManager.processContainersNow();
+    //waiting for inflightDeletion generation
+    Thread.sleep(100L);
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+    resetReplicationManager();
+
+    //make the replica num be 2 to test the case
+    //that container is in inflightReplication
+    containerStateManager.removeContainerReplica(id, dn6);
+    containerStateManager.removeContainerReplica(id, dn5);
+    containerStateManager.removeContainerReplica(id, dn4);
+    //replication manager should generate inflightReplication
+    replicationManager.processContainersNow();
+    //waiting for inflightReplication generation
+    Thread.sleep(100L);
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+  }
+
   @Test
   public void testReplicateCommandTimeout() throws
       SCMException, InterruptedException {
@@ -1112,21 +1359,39 @@ public class TestReplicationManager {
     return container;
   }
 
-  private ContainerReplica addReplica(ContainerInfo container,
-      NodeStatus nodeStatus, State replicaState)
-      throws ContainerNotFoundException {
+  private DatanodeDetails addNode(NodeStatus nodeStatus) {
     DatanodeDetails dn = randomDatanodeDetails();
     dn.setPersistedOpState(nodeStatus.getOperationalState());
     dn.setPersistedOpStateExpiryEpochSec(
         nodeStatus.getOpStateExpiryEpochSeconds());
     nodeManager.register(dn, nodeStatus);
+    return dn;
+  }
+
+  private void resetReplicationManager() throws InterruptedException {
+    replicationManager.stop();
+    Thread.sleep(100L);
+    replicationManager.start();
+    Thread.sleep(100L);
+  }
+
+  private ContainerReplica addReplica(ContainerInfo container,
+      NodeStatus nodeStatus, State replicaState)
+      throws ContainerNotFoundException {
+    DatanodeDetails dn = addNode(nodeStatus);
+    return addReplicaToDn(container, dn, replicaState);
+  }
+
+  private ContainerReplica addReplicaToDn(ContainerInfo container,
+                               DatanodeDetails dn, State replicaState)
+      throws ContainerNotFoundException {
     // Using the same originID for all replica in the container set. If each
     // replica has a unique originID, it causes problems in ReplicationManager
     // when processing over-replicated containers.
     final UUID originNodeId =
         UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
     final ContainerReplica replica = getReplicas(
-        container.containerID(), CLOSED, 1000L, originNodeId, dn);
+        container.containerID(), replicaState, 1000L, originNodeId, dn);
     containerStateManager
         .updateContainerReplica(container.containerID(), replica);
     return replica;
@@ -1196,5 +1461,4 @@ public class TestReplicationManager {
               dc.getDatanodeId().equals(datanode.getUuid()));
     }
   }
-
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to