This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4ccc128 HDDS-4928. Support container move in Replication Manager
(#2349)
4ccc128 is described below
commit 4ccc1281389344cb660cdb1d8d8119addb2f770c
Author: Jackson Yao <[email protected]>
AuthorDate: Mon Jul 19 14:18:18 2021 +0800
HDDS-4928. Support container move in Replication Manager (#2349)
---
.../hdds/scm/container/ReplicationManager.java | 485 +++++++++++++++++++--
.../hdds/scm/container/TestReplicationManager.java | 278 +++++++++++-
2 files changed, 712 insertions(+), 51 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 2903653..d647055 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -40,6 +41,8 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -50,7 +53,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -136,6 +140,64 @@ public class ReplicationManager implements MetricsSource,
SCMService {
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
/**
+ * This is used for tracking container move commands
+ * which are not yet complete.
+ */
+ private final Map<ContainerID,
+ Pair<DatanodeDetails, DatanodeDetails>> inflightMove;
+
+ /**
+ * This is used for indicating the result of move option and
+ * the corresponding reason. this is useful for tracking
+ * the result of move option
+ */
+ enum MoveResult {
+ // both replication and deletion are completed
+ COMPLETED,
+ // RM is not running
+ RM_NOT_RUNNING,
+ // replication fail because the container does not exist in src
+ REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
+ // replication fail because the container exists in target
+ REPLICATION_FAIL_EXIST_IN_TARGET,
+ // replication fail because the container is not cloesed
+ REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
+ // replication fail because the container is in inflightDeletion
+ REPLICATION_FAIL_INFLIGHT_DELETION,
+ // replication fail because the container is in inflightReplication
+ REPLICATION_FAIL_INFLIGHT_REPLICATION,
+ // replication fail because of timeout
+ REPLICATION_FAIL_TIME_OUT,
+ // replication fail because of node is not in service
+ REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
+ // replication fail because node is unhealthy
+ REPLICATION_FAIL_NODE_UNHEALTHY,
+ // replication succeed, but deletion fail because of timeout
+ DELETION_FAIL_TIME_OUT,
+ // replication succeed, but deletion fail because because
+ // node is unhealthy
+ DELETION_FAIL_NODE_UNHEALTHY,
+ // replication succeed, but if we delete the container from
+ // the source datanode , the policy(eg, replica num or
+ // rack location) will not be satisfied, so we should not delete
+ // the container
+ DELETE_FAIL_POLICY,
+ // replicas + target - src does not satisfy placement policy
+ PLACEMENT_POLICY_NOT_SATISFIED,
+ //unexpected action, remove src at inflightReplication
+ UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION,
+ //unexpected action, remove target at inflightDeletion
+ UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION
+ }
+
+ /**
+ * This is used for tracking container move commands
+ * which are not yet complete.
+ */
+ private final Map<ContainerID,
+ CompletableFuture<MoveResult>> inflightMoveFuture;
+
+ /**
* ReplicationManager specific configuration.
*/
private final ReplicationManagerConfiguration rmConf;
@@ -194,6 +256,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
+ this.inflightMove = new ConcurrentHashMap<>();
+ this.inflightMoveFuture = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
this.clock = clock;
@@ -214,7 +278,6 @@ public class ReplicationManager implements MetricsSource,
SCMService {
*/
@Override
public synchronized void start() {
-
if (!isRunning()) {
DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
"SCM Replication manager (closed container replication) related "
@@ -264,6 +327,9 @@ public class ReplicationManager implements MetricsSource,
SCMService {
LOG.info("Stopping Replication Monitor Thread.");
inflightReplication.clear();
inflightDeletion.clear();
+ //TODO: replicate inflight move through ratis
+ inflightMove.clear();
+ inflightMoveFuture.clear();
running = false;
DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
notifyAll();
@@ -458,13 +524,17 @@ public class ReplicationManager implements MetricsSource,
SCMService {
try {
InflightAction a = iter.next();
NodeStatus status = nodeManager.getNodeStatus(a.datanode);
- NodeState state = status.getHealth();
- NodeOperationalState opState = status.getOperationalState();
- if (state != NodeState.HEALTHY || a.time < deadline ||
- filter.test(a) || opState != NodeOperationalState.IN_SERVICE) {
+ boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
+ boolean isCompleted = filter.test(a);
+ boolean isTimeout = a.time < deadline;
+ boolean isNotInService = status.getOperationalState() !=
+ NodeOperationalState.IN_SERVICE;
+ if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
iter.remove();
+ updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
+ container, a.datanode, inflightActions);
}
- } catch (NodeNotFoundException e) {
+ } catch (NodeNotFoundException | ContainerNotFoundException e) {
// Should not happen, but if it does, just remove the action as the
// node somehow does not exist;
iter.remove();
@@ -477,6 +547,247 @@ public class ReplicationManager implements MetricsSource,
SCMService {
}
/**
+ * update inflight move if needed.
+ *
+ * @param isUnhealthy is the datanode unhealthy
+ * @param isCompleted is the action completed
+ * @param isTimeout is the action timeout
+ * @param container Container to update
+ * @param dn datanode which is removed from the inflightActions
+ * @param inflightActions inflightReplication (or) inflightDeletion
+ */
+ private void updateMoveIfNeeded(final boolean isUnhealthy,
+ final boolean isCompleted, final boolean isTimeout,
+ final ContainerInfo container, final DatanodeDetails dn,
+ final Map<ContainerID,
+ List<InflightAction>> inflightActions)
+ throws ContainerNotFoundException {
+ // make sure inflightMove contains the container
+ ContainerID id = container.containerID();
+ if (!inflightMove.containsKey(id)) {
+ return;
+ }
+
+ // make sure the datanode , which is removed from inflightActions,
+ // is source or target datanode.
+ Pair<DatanodeDetails, DatanodeDetails> kv = inflightMove.get(id);
+ final boolean isSource = kv.getKey().equals(dn);
+ final boolean isTarget = kv.getValue().equals(dn);
+ if (!isSource && !isTarget) {
+ return;
+ }
+ final boolean isInflightReplication =
+ inflightActions.equals(inflightReplication);
+
+ /*
+ * there are some case:
+ **********************************************************
+ * * InflightReplication * InflightDeletion *
+ **********************************************************
+ *source removed* unexpected * expected *
+ **********************************************************
+ *target removed* expected * unexpected *
+ **********************************************************
+ * unexpected action may happen somehow. to make it deterministic,
+ * if unexpected action happens, we just fail the completableFuture.
+ */
+
+ if (isSource && isInflightReplication) {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION);
+ inflightMove.remove(id);
+ inflightMoveFuture.remove(id);
+ return;
+ }
+
+ if (isTarget && !isInflightReplication) {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION);
+ inflightMove.remove(id);
+ inflightMoveFuture.remove(id);
+ return;
+ }
+
+ if (!(isInflightReplication && isCompleted)) {
+ if (isInflightReplication) {
+ if (isUnhealthy) {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+ } else {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.REPLICATION_FAIL_TIME_OUT);
+ }
+ } else {
+ if (isUnhealthy) {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+ } else if (isTimeout) {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.DELETION_FAIL_TIME_OUT);
+ } else {
+ inflightMoveFuture.get(id).complete(
+ MoveResult.COMPLETED);
+ }
+ }
+ inflightMove.remove(id);
+ inflightMoveFuture.remove(id);
+ } else {
+ deleteSrcDnForMove(container,
+ containerManager.getContainerReplicas(id));
+ }
+ }
+
+ /**
+ * add a move action for a given container.
+ *
+ * @param cid Container to move
+ * @param srcDn datanode to move from
+ * @param targetDn datanode to move to
+ */
+ public CompletableFuture<MoveResult> move(ContainerID cid,
+ DatanodeDetails srcDn, DatanodeDetails targetDn)
+ throws ContainerNotFoundException, NodeNotFoundException {
+ CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+ if (!isRunning()) {
+ ret.complete(MoveResult.RM_NOT_RUNNING);
+ return ret;
+ }
+
+ /*
+ * make sure the flowing conditions are met:
+ * 1 the given two datanodes are in healthy state
+ * 2 the given container exists on the given source datanode
+ * 3 the given container does not exist on the given target datanode
+ * 4 the given container is in closed state
+ * 5 the giver container is not taking any inflight action
+ * 6 the given two datanodes are in IN_SERVICE state
+ * 7 {Existing replicas + Target_Dn - Source_Dn} satisfies
+ * the placement policy
+ *
+ * move is a combination of two steps : replication and deletion.
+ * if the conditions above are all met, then we take a conservative
+ * strategy here : replication can always be executed, but the execution
+ * of deletion always depends on placement policy
+ */
+
+ NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+ NodeState healthStat = currentNodeStat.getHealth();
+ NodeOperationalState operationalState =
+ currentNodeStat.getOperationalState();
+ if (healthStat != NodeState.HEALTHY) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+ return ret;
+ }
+ if (operationalState != NodeOperationalState.IN_SERVICE) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+ return ret;
+ }
+
+ currentNodeStat = nodeManager.getNodeStatus(targetDn);
+ healthStat = currentNodeStat.getHealth();
+ operationalState = currentNodeStat.getOperationalState();
+ if (healthStat != NodeState.HEALTHY) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+ return ret;
+ }
+ if (operationalState != NodeOperationalState.IN_SERVICE) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+ return ret;
+ }
+
+ // we need to synchronize on ContainerInfo, since it is
+ // shared by ICR/FCR handler and this.processContainer
+ // TODO: use a Read lock after introducing a RW lock into ContainerInfo
+ ContainerInfo cif = containerManager.getContainer(cid);
+ synchronized (cif) {
+ final Set<ContainerReplica> currentReplicas = containerManager
+ .getContainerReplicas(cid);
+ final Set<DatanodeDetails> replicas = currentReplicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toSet());
+ if (replicas.contains(targetDn)) {
+ ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+ return ret;
+ }
+ if (!replicas.contains(srcDn)) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+ return ret;
+ }
+
+ /*
+ * the reason why the given container should not be taking any inflight
+ * action is that: if the given container is being replicated or deleted,
+ * the num of its replica is not deterministic, so move operation issued
+ * by balancer may cause a nondeterministic result, so we should drop
+ * this option for this time.
+ * */
+
+ if (inflightReplication.containsKey(cid)) {
+ ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+ return ret;
+ }
+ if (inflightDeletion.containsKey(cid)) {
+ ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+ return ret;
+ }
+
+ /*
+ * here, no need to see whether cid is in inflightMove, because
+ * these three map are all synchronized on ContainerInfo, if cid
+ * is in infligtMove , it must now being replicated or deleted,
+ * so it must be in inflightReplication or in infligthDeletion.
+ * thus, if we can not find cid in both of them , this cid must
+ * not be in inflightMove.
+ */
+
+ LifeCycleState currentContainerStat = cif.getState();
+ if (currentContainerStat != LifeCycleState.CLOSED) {
+ ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+ return ret;
+ }
+
+ // check whether {Existing replicas + Target_Dn - Source_Dn}
+ // satisfies current placement policy
+ if (!isPolicySatisfiedAfterMove(cif, srcDn, targetDn,
+ currentReplicas.stream().collect(Collectors.toList()))) {
+ ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED);
+ return ret;
+ }
+
+ inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn));
+ inflightMoveFuture.putIfAbsent(cid, ret);
+ sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn));
+ }
+ LOG.info("receive a move request about container {} , from {} to {}",
+ cid, srcDn.getUuid(), targetDn.getUuid());
+ return ret;
+ }
+
+ /**
+ * Returns whether {Existing replicas + Target_Dn - Source_Dn}
+ * satisfies current placement policy.
+ * @param cif Container Info of moved container
+ * @param srcDn DatanodeDetails of source data node
+ * @param targetDn DatanodeDetails of target data node
+ * @param replicas container replicas
+ * @return whether the placement policy is satisfied after move
+ */
+ private boolean isPolicySatisfiedAfterMove(ContainerInfo cif,
+ DatanodeDetails srcDn, DatanodeDetails targetDn,
+ final List<ContainerReplica> replicas){
+ Set<ContainerReplica> movedReplicas =
+ replicas.stream().collect(Collectors.toSet());
+ movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+ movedReplicas.add(ContainerReplica.newBuilder()
+ .setDatanodeDetails(targetDn)
+ .setContainerID(cif.containerID())
+ .setContainerState(State.CLOSED).build());
+ ContainerPlacementStatus placementStatus = getPlacementStatus(
+ movedReplicas, cif.getReplicationConfig().getRequiredNodes());
+ return placementStatus.isPolicySatisfied();
+ }
+
+ /**
* Returns the number replica which are pending creation for the given
* container ID.
* @param id The ContainerID for which to check the pending replica
@@ -864,48 +1175,130 @@ public class ReplicationManager implements
MetricsSource, SCMService {
break;
}
}
- // After removing all unhealthy replicas, if the container is still over
- // replicated then we need to check if it is already mis-replicated.
- // If it is, we do no harm by removing excess replicas. However, if it is
- // not mis-replicated, then we can only remove replicas if they don't
- // make the container become mis-replicated.
- if (excess > 0) {
- eligibleReplicas.removeAll(unhealthyReplicas);
- Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
- ContainerPlacementStatus ps =
- getPlacementStatus(eligibleSet, replicationFactor);
- for (ContainerReplica r : eligibleReplicas) {
- if (excess <= 0) {
- break;
- }
- // First remove the replica we are working on from the set, and then
- // check if the set is now mis-replicated.
- eligibleSet.remove(r);
- ContainerPlacementStatus nowPS =
- getPlacementStatus(eligibleSet, replicationFactor);
- if ((!ps.isPolicySatisfied()
- && nowPS.actualPlacementCount() == ps.actualPlacementCount())
- || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) {
- // Remove the replica if the container was already unsatisfied
- // and losing this replica keep actual placement count unchanged.
- // OR if losing this replica still keep satisfied
- sendDeleteCommand(container, r.getDatanodeDetails(), true);
- excess -= 1;
- continue;
- }
- // If we decided not to remove this replica, put it back into the set
- eligibleSet.add(r);
+ eligibleReplicas.removeAll(unhealthyReplicas);
+ removeExcessReplicasIfNeeded(excess, container, eligibleReplicas);
+ }
+ }
+
+ /**
+ * if the container is in inflightMove, handle move.
+ * This function assumes replication has been completed
+ *
+ * @param cif ContainerInfo
+ * @param replicaSet An Set of replicas, which may have excess replicas
+ */
+ private void deleteSrcDnForMove(final ContainerInfo cif,
+ final Set<ContainerReplica> replicaSet) {
+ final ContainerID cid = cif.containerID();
+ if (inflightMove.containsKey(cid)) {
+ Pair<DatanodeDetails, DatanodeDetails> movePair =
+ inflightMove.get(cid);
+ final DatanodeDetails srcDn = movePair.getKey();
+ ContainerReplicaCount replicaCount =
+ getContainerReplicaCount(cif, replicaSet);
+
+ if(!replicaSet.stream()
+ .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
+ // if the target is present but source disappears somehow,
+ // we can consider move is successful.
+ inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED);
+ inflightMove.remove(cid);
+ inflightMoveFuture.remove(cid);
+ return;
+ }
+
+ int replicationFactor =
+ cif.getReplicationConfig().getRequiredNodes();
+ ContainerPlacementStatus currentCPS =
+ getPlacementStatus(replicaSet, replicationFactor);
+ Set<ContainerReplica> newReplicaSet = replicaSet.
+ stream().collect(Collectors.toSet());
+ newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+ ContainerPlacementStatus newCPS =
+ getPlacementStatus(newReplicaSet, replicationFactor);
+
+ if (replicaCount.isOverReplicated() &&
+ isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
+ sendDeleteCommand(cif, srcDn, true);
+ } else {
+ // if source and target datanode are both in the replicaset,
+ // but we can not delete source datanode for now (e.g.,
+ // there is only 3 replicas or not policy-statisfied , etc.),
+ // we just complete the future without sending a delete command.
+ LOG.info("can not remove source replica after successfully " +
+ "replicated to target datanode");
+ inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY);
+ inflightMove.remove(cid);
+ inflightMoveFuture.remove(cid);
+ }
+ }
+ }
+
+ /**
+ * remove execess replicas if needed, replicationFactor and placement policy
+ * will be take into consideration.
+ *
+ * @param excess the excess number after subtracting replicationFactor
+ * @param container ContainerInfo
+ * @param eligibleReplicas An list of replicas, which may have excess
replicas
+ */
+ private void removeExcessReplicasIfNeeded(int excess,
+ final ContainerInfo container,
+ final List<ContainerReplica> eligibleReplicas) {
+ // After removing all unhealthy replicas, if the container is still over
+ // replicated then we need to check if it is already mis-replicated.
+ // If it is, we do no harm by removing excess replicas. However, if it is
+ // not mis-replicated, then we can only remove replicas if they don't
+ // make the container become mis-replicated.
+ if (excess > 0) {
+ Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
+ final int replicationFactor =
+ container.getReplicationConfig().getRequiredNodes();
+ ContainerPlacementStatus ps =
+ getPlacementStatus(eligibleSet, replicationFactor);
+
+ for (ContainerReplica r : eligibleReplicas) {
+ if (excess <= 0) {
+ break;
}
- if (excess > 0) {
- LOG.info("The container {} is over replicated with {} excess " +
- "replica. The excess replicas cannot be removed without " +
- "violating the placement policy", container, excess);
+ // First remove the replica we are working on from the set, and then
+ // check if the set is now mis-replicated.
+ eligibleSet.remove(r);
+ ContainerPlacementStatus nowPS =
+ getPlacementStatus(eligibleSet, replicationFactor);
+ if (isPlacementStatusActuallyEqual(ps, nowPS)) {
+ // Remove the replica if the container was already unsatisfied
+ // and losing this replica keep actual placement count unchanged.
+ // OR if losing this replica still keep satisfied
+ sendDeleteCommand(container, r.getDatanodeDetails(), true);
+ excess -= 1;
+ continue;
}
+ // If we decided not to remove this replica, put it back into the set
+ eligibleSet.add(r);
+ }
+ if (excess > 0) {
+ LOG.info("The container {} is over replicated with {} excess " +
+ "replica. The excess replicas cannot be removed without " +
+ "violating the placement policy", container, excess);
}
}
}
/**
+ * whether the given two ContainerPlacementStatus are actually equal.
+ *
+ * @param cps1 ContainerPlacementStatus
+ * @param cps2 ContainerPlacementStatus
+ */
+ private boolean isPlacementStatusActuallyEqual(
+ ContainerPlacementStatus cps1,
+ ContainerPlacementStatus cps2) {
+ return cps1.actualPlacementCount() == cps2.actualPlacementCount() ||
+ cps1.isPolicySatisfied() && cps2.isPolicySatisfied();
+ }
+
+ /**
* Given a set of ContainerReplica, transform it to a list of DatanodeDetails
* and then check if the list meets the container placement policy.
* @param replicas List of containerReplica
@@ -915,7 +1308,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
private ContainerPlacementStatus getPlacementStatus(
Set<ContainerReplica> replicas, int replicationFactor) {
List<DatanodeDetails> replicaDns = replicas.stream()
- .map(c -> c.getDatanodeDetails()).collect(Collectors.toList());
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
return containerPlacement.validateContainerPlacement(
replicaDns, replicationFactor);
}
@@ -1155,6 +1549,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
inflightReplication.size())
.addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
inflightDeletion.size())
+ .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE,
+ inflightMove.size())
.endRecord();
}
@@ -1250,7 +1646,8 @@ public class ReplicationManager implements MetricsSource,
SCMService {
public enum ReplicationManagerMetrics implements MetricsInfo {
INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
- INFLIGHT_DELETION("Tracked inflight container deletion requests.");
+ INFLIGHT_DELETION("Tracked inflight container deletion requests."),
+ INFLIGHT_MOVE("Tracked inflight container move requests.");
private final String desc;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 38b06e1..d01aafc 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager
.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -60,6 +62,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,7 +77,8 @@ import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -92,6 +97,7 @@ public class TestReplicationManager {
private ContainerManagerV2 containerManager;
private OzoneConfiguration conf;
private SCMNodeManager scmNodeManager;
+ private GenericTestUtils.LogCapturer scmLogs;
private TestClock clock;
@Before
@@ -102,6 +108,7 @@ public class TestReplicationManager {
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
+ scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
containerManager = Mockito.mock(ContainerManagerV2.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
@@ -178,6 +185,7 @@ public class TestReplicationManager {
clock);
serviceManager.notifyStatusChanged();
+ scmLogs.clearOutput();
Thread.sleep(100L);
}
@@ -641,7 +649,7 @@ public class TestReplicationManager {
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
- final Set<ContainerReplica> replicas = getReplicas(id, CLOSED,
+ final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
@@ -1086,6 +1094,245 @@ public class TestReplicationManager {
assertReplicaScheduled(0);
}
+ /**
+ * if all the prerequisites are satisfied, move should work as expected.
+ */
+ @Test
+ public void testMove() throws SCMException, NodeNotFoundException,
+ InterruptedException, ExecutionException {
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ ContainerID id = container.containerID();
+ ContainerReplica dn1 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+ CompletableFuture<MoveResult> cf =
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(scmLogs.getOutput().contains(
+ "receive a move request about container"));
+ Thread.sleep(100L);
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.replicateContainerCommand, dn3));
+ Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ //replicate container to dn3
+ addReplicaToDn(container, dn3, CLOSED);
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
dn1.getDatanodeDetails()));
+ Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+ containerStateManager.removeContainerReplica(id, dn1);
+
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+
+ Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
+ }
+
+ /**
+ * make sure RM does not delete replica if placement policy is not satisfied.
+ */
+ @Test
+ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
+ throws SCMException, NodeNotFoundException,
+ InterruptedException, ExecutionException {
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ ContainerID id = container.containerID();
+ ContainerReplica dn1 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ ContainerReplica dn2 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+ CompletableFuture<MoveResult> cf =
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
+ Assert.assertTrue(scmLogs.getOutput().contains(
+ "receive a move request about container"));
+ Thread.sleep(100L);
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.replicateContainerCommand, dn4));
+ Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ //replicate container to dn4
+ addReplicaToDn(container, dn4, CLOSED);
+ //now, replication succeeds, but replica in dn2 lost,
+ //and there are only tree replicas totally, so rm should
+ //not delete the replica on dn1
+ containerStateManager.removeContainerReplica(id, dn2);
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+
+ Assert.assertFalse(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
dn1.getDatanodeDetails()));
+
+ Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.DELETE_FAIL_POLICY);
+ }
+
+
+ /**
+ * test src and target datanode become unhealthy when moving.
+ */
+ @Test
+ public void testDnBecameUnhealthyWhenMoving() throws SCMException,
+ NodeNotFoundException, InterruptedException, ExecutionException {
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ ContainerID id = container.containerID();
+ ContainerReplica dn1 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+ CompletableFuture<MoveResult> cf =
+ replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(scmLogs.getOutput().contains(
+ "receive a move request about container"));
+
+ nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE));
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+
+ nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ addReplicaToDn(container, dn3, CLOSED);
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+ nodeManager.setNodeStatus(dn1.getDatanodeDetails(),
+ new NodeStatus(IN_SERVICE, STALE));
+ replicationManager.processContainersNow();
+ Thread.sleep(100L);
+
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+ }
+
+ /**
+ * before Replication Manager generates a completablefuture for a move
option,
+ * some Prerequisites should be satisfied.
+ */
+ @Test
+ public void testMovePrerequisites()
+ throws SCMException, NodeNotFoundException,
+ InterruptedException, ExecutionException {
+ //all conditions is met
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ ContainerID id = container.containerID();
+ ContainerReplica dn1 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ ContainerReplica dn2 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+ ContainerReplica dn4 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+
+ CompletableFuture<MoveResult> cf;
+ //the above move is executed successfully, so there may be some item in
+ //inflightReplication or inflightDeletion. here we stop replication manager
+ //to clear these states, which may impact the tests below.
+ //we don't need a running replicationManamger now
+ replicationManager.stop();
+ Thread.sleep(100L);
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.RM_NOT_RUNNING);
+ replicationManager.start();
+ Thread.sleep(100L);
+
+ //container in not in CLOSED state
+ for (LifeCycleState state : LifeCycleState.values()) {
+ if (state != LifeCycleState.CLOSED) {
+ container.setState(state);
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+ }
+ }
+ container.setState(LifeCycleState.CLOSED);
+
+ //Node is not in healthy state
+ for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
+ if (state != HEALTHY) {
+ nodeManager.setNodeStatus(dn3,
+ new NodeStatus(IN_SERVICE, state));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+ cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+ }
+ }
+ nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+
+ //Node is not in IN_SERVICE state
+ for (HddsProtos.NodeOperationalState state :
+ HddsProtos.NodeOperationalState.values()) {
+ if (state != IN_SERVICE) {
+ nodeManager.setNodeStatus(dn3,
+ new NodeStatus(state, HEALTHY));
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+ cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+ }
+ }
+ nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
+
+ //container exists in target datanode
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(),
+ dn2.getDatanodeDetails());
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+
+ //container does not exist in source datanode
+ cf = replicationManager.move(id, dn3, dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+
+ replicationManager.start();
+ //make container over relplicated to test the
+ // case that container is in inflightDeletion
+ ContainerReplica dn5 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
+ ContainerReplica dn6 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
+ replicationManager.processContainersNow();
+ //waiting for inflightDeletion generation
+ Thread.sleep(100L);
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+ resetReplicationManager();
+
+ //make the replica num be 2 to test the case
+ //that container is in inflightReplication
+ containerStateManager.removeContainerReplica(id, dn6);
+ containerStateManager.removeContainerReplica(id, dn5);
+ containerStateManager.removeContainerReplica(id, dn4);
+ //replication manager should generate inflightReplication
+ replicationManager.processContainersNow();
+ //waiting for inflightReplication generation
+ Thread.sleep(100L);
+ cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+ }
+
@Test
public void testReplicateCommandTimeout() throws
SCMException, InterruptedException {
@@ -1112,21 +1359,39 @@ public class TestReplicationManager {
return container;
}
- private ContainerReplica addReplica(ContainerInfo container,
- NodeStatus nodeStatus, State replicaState)
- throws ContainerNotFoundException {
+ private DatanodeDetails addNode(NodeStatus nodeStatus) {
DatanodeDetails dn = randomDatanodeDetails();
dn.setPersistedOpState(nodeStatus.getOperationalState());
dn.setPersistedOpStateExpiryEpochSec(
nodeStatus.getOpStateExpiryEpochSeconds());
nodeManager.register(dn, nodeStatus);
+ return dn;
+ }
+
+ private void resetReplicationManager() throws InterruptedException {
+ replicationManager.stop();
+ Thread.sleep(100L);
+ replicationManager.start();
+ Thread.sleep(100L);
+ }
+
+ private ContainerReplica addReplica(ContainerInfo container,
+ NodeStatus nodeStatus, State replicaState)
+ throws ContainerNotFoundException {
+ DatanodeDetails dn = addNode(nodeStatus);
+ return addReplicaToDn(container, dn, replicaState);
+ }
+
+ private ContainerReplica addReplicaToDn(ContainerInfo container,
+ DatanodeDetails dn, State replicaState)
+ throws ContainerNotFoundException {
// Using the same originID for all replica in the container set. If each
// replica has a unique originID, it causes problems in ReplicationManager
// when processing over-replicated containers.
final UUID originNodeId =
UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
final ContainerReplica replica = getReplicas(
- container.containerID(), CLOSED, 1000L, originNodeId, dn);
+ container.containerID(), replicaState, 1000L, originNodeId, dn);
containerStateManager
.updateContainerReplica(container.containerID(), replica);
return replica;
@@ -1196,5 +1461,4 @@ public class TestReplicationManager {
dc.getDatanodeId().equals(datanode.getUuid()));
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]