JacksonYao287 commented on a change in pull request #2349:
URL: https://github.com/apache/ozone/pull/2349#discussion_r670899224



##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -861,47 +1168,145 @@ private void handleOverReplicatedContainer(final 
ContainerInfo container,
           break;
         }
       }
-      // After removing all unhealthy replicas, if the container is still over
-      // replicated then we need to check if it is already mis-replicated.
-      // If it is, we do no harm by removing excess replicas. However, if it is
-      // not mis-replicated, then we can only remove replicas if they don't
-      // make the container become mis-replicated.
-      if (excess > 0) {
-        eligibleReplicas.removeAll(unhealthyReplicas);
-        Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
-        ContainerPlacementStatus ps =
-            getPlacementStatus(eligibleSet, replicationFactor);
-        for (ContainerReplica r : eligibleReplicas) {
-          if (excess <= 0) {
-            break;
-          }
-          // First remove the replica we are working on from the set, and then
-          // check if the set is now mis-replicated.
-          eligibleSet.remove(r);
-          ContainerPlacementStatus nowPS =
-              getPlacementStatus(eligibleSet, replicationFactor);
-          if ((!ps.isPolicySatisfied()
-                && nowPS.actualPlacementCount() == ps.actualPlacementCount())
-              || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) {
-            // Remove the replica if the container was already unsatisfied
-            // and losing this replica keep actual placement count unchanged.
-            // OR if losing this replica still keep satisfied
-            sendDeleteCommand(container, r.getDatanodeDetails(), true);
-            excess -= 1;
-            continue;
-          }
-          // If we decided not to remove this replica, put it back into the set
-          eligibleSet.add(r);
+      eligibleReplicas.removeAll(unhealthyReplicas);
+
+      excess -= handleMoveIfNeeded(container, eligibleReplicas);
+
+      removeExcessReplicasIfNeeded(excess, container, eligibleReplicas);
+    }
+  }
+
+  /**
+   * if the container is in inflightMove, handle move if needed.
+   *
+   * @param cif ContainerInfo
+   * @param eligibleReplicas An list of replicas, which may have excess 
replicas
+   * @return minus how many replica is removed through sending delete command
+   */
+  private int handleMoveIfNeeded(final ContainerInfo cif,
+                   final List<ContainerReplica> eligibleReplicas) {
+    int minus = 0;
+    final ContainerID cid = cif.containerID();
+    if (!inflightMove.containsKey(cid)) {
+      return minus;
+    }
+
+    Pair<DatanodeDetails, DatanodeDetails> movePair =
+        inflightMove.get(cid);
+
+    final DatanodeDetails srcDn = movePair.getKey();
+    final DatanodeDetails targetDn = movePair.getValue();
+    boolean isSourceDnInReplicaSet;
+    boolean isTargetDnInReplicaSet;
+
+    isSourceDnInReplicaSet = eligibleReplicas.stream()
+        .anyMatch(r -> r.getDatanodeDetails().equals(srcDn));
+    isTargetDnInReplicaSet = eligibleReplicas.stream()
+        .anyMatch(r -> r.getDatanodeDetails().equals(targetDn));
+
+    // if target datanode is not in replica set , nothing to do
+    if (isTargetDnInReplicaSet) {
+      Set<ContainerReplica> eligibleReplicaSet =
+          eligibleReplicas.stream().collect(Collectors.toSet());
+      int replicationFactor =
+          cif.getReplicationConfig().getRequiredNodes();
+      ContainerPlacementStatus currentCPS =
+          getPlacementStatus(eligibleReplicaSet, replicationFactor);
+      eligibleReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+      ContainerPlacementStatus afterMoveCPS =
+          getPlacementStatus(eligibleReplicaSet, replicationFactor);
+
+      if (isSourceDnInReplicaSet &&
+          isPlacementStatusActuallyEqual(currentCPS, afterMoveCPS)) {
+        sendDeleteCommand(cif, srcDn, true);
+        eligibleReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+        minus++;
+      } else {
+        if (!isSourceDnInReplicaSet) {
+          // if the target is present, and source disappears somehow,
+          // we can consider move is successful.
+          inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED);
+        } else {
+          // if source and target datanode are both in the replicaset,
+          // but we can not delete source datanode for now (e.g.,
+          // there is only 3 replicas or not policy-statisfied , etc.),
+          // we just complete the future without sending a delete command.
+          LOG.info("can not remove source replica after successfully " +
+              "replicated to target datanode");
+          inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY);
         }
-        if (excess > 0) {
-          LOG.info("The container {} is over replicated with {} excess " +
-              "replica. The excess replicas cannot be removed without " +
-              "violating the placement policy", container, excess);
+        inflightMove.remove(cid);
+        inflightMoveFuture.remove(cid);
+      }
+    }
+
+    return minus;
+  }
+
+  /**
+   * remove execess replicas if needed, replicationFactor and placement policy
+   * will be take into consideration.
+   *
+   * @param excess the excess number after subtracting replicationFactor
+   * @param container ContainerInfo
+   * @param eligibleReplicas An list of replicas, which may have excess 
replicas
+   */
+  private void removeExcessReplicasIfNeeded(int excess,
+                    final ContainerInfo container,
+                    final List<ContainerReplica> eligibleReplicas) {
+    // After removing all unhealthy replicas, if the container is still over
+    // replicated then we need to check if it is already mis-replicated.
+    // If it is, we do no harm by removing excess replicas. However, if it is
+    // not mis-replicated, then we can only remove replicas if they don't
+    // make the container become mis-replicated.
+    if (excess > 0) {
+      Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
+      final int replicationFactor =
+          container.getReplicationConfig().getRequiredNodes();
+      ContainerPlacementStatus ps =
+          getPlacementStatus(eligibleSet, replicationFactor);
+
+      for (ContainerReplica r : eligibleReplicas) {
+        if (excess <= 0) {
+          break;
         }
+        // First remove the replica we are working on from the set, and then
+        // check if the set is now mis-replicated.
+        eligibleSet.remove(r);
+        ContainerPlacementStatus nowPS =
+            getPlacementStatus(eligibleSet, replicationFactor);
+        if (isPlacementStatusActuallyEqual(ps, nowPS)) {
+          // Remove the replica if the container was already unsatisfied
+          // and losing this replica keep actual placement count unchanged.
+          // OR if losing this replica still keep satisfied
+          sendDeleteCommand(container, r.getDatanodeDetails(), true);
+          excess -= 1;
+          continue;
+        }
+        // If we decided not to remove this replica, put it back into the set
+        eligibleSet.add(r);
+      }
+      if (excess > 0) {
+        LOG.info("The container {} is over replicated with {} excess " +
+            "replica. The excess replicas cannot be removed without " +
+            "violating the placement policy", container, excess);
       }
     }
   }
 
+  /**
+   * whether the given two ContainerPlacementStatus are actually equal.
+   *
+   * @param cps1 ContainerPlacementStatus
+   * @param cps2 ContainerPlacementStatus
+   */
+  private boolean isPlacementStatusActuallyEqual(
+                      ContainerPlacementStatus cps1,
+                      ContainerPlacementStatus cps2) {
+    return cps1.actualPlacementCount() == cps2.actualPlacementCount() ||
+        cps1.isPolicySatisfied() && cps2.isPolicySatisfied();

Review comment:
       yea, make sense , i will add it back




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to