siddhantsangwan commented on code in PR #3920:
URL: https://github.com/apache/ozone/pull/3920#discussion_r1051910954
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java:
##########
@@ -2009,5 +1929,256 @@ private void compleleteMoveFutureWithResult(ContainerID
cid, MoveResult mr) {
inflightMoveFuture.remove(cid);
}
}
-}
+ /* HELPER METHODS FOR UNHEALTHY OVER AND UNDER REPLICATED CONTAINERS */
+
+ private void handleOverReplicatedAllUnhealthy(ContainerInfo container,
+ List<ContainerReplica> replicas, int excess) {
+ List<ContainerReplica> deleteCandidates =
+ getUnhealthyDeletionCandidates(container, replicas);
+
+ // Only unhealthy replicas which cannot be closed will remain eligible
+ // for deletion, since this method is deleting unhealthy containers only.
+ closeReplicasIfPossible(container, deleteCandidates);
+ if (deleteCandidates.isEmpty()) {
+ return;
+ }
+
+ if (container.getState() == LifeCycleState.CLOSED) {
+ // Prefer to delete unhealthy replicas with lower BCS IDs.
+ deleteExcessLowestBcsIDs(container, deleteCandidates, excess);
+ } else {
+ // Container is not yet closed.
+ // We only need to save the unhealthy replicas if they
+ // represent unique origin node IDs. If recovering these replicas is
+ // possible in the future they could be used to close the container.
+ Set<UUID> originNodeIDs = replicas.stream()
+ .map(ContainerReplica::getOriginDatanodeId)
+ .collect(Collectors.toSet());
+ deleteExcessWithNonUniqueOriginNodeIDs(container,
+ originNodeIDs, deleteCandidates, excess);
+ }
+ }
+
+ private void handleUnderReplicatedAllUnhealthy(ContainerInfo container,
+ List<ContainerReplica> replicas, ContainerPlacementStatus
placementStatus,
+ int additionalReplicasNeeded) {
+
+ int numCloseCmdsSent = closeReplicasIfPossible(container, replicas);
+ // Only replicate unhealthy containers if none of the unhealthy replicas
+ // could be closed. If we sent a close command to an unhealthy replica,
+ // we should wait for that to complete and replicate it when it becomes
+ // healthy on a future iteration.
+ if (numCloseCmdsSent == 0) {
+ // TODO Datanodes currently shuffle sources, so we cannot prioritize
+ // some replicas based on BCSID or origin node ID.
+ replicateAnyWithTopology(container,
+ getReplicationSources(container, replicas), placementStatus,
+ additionalReplicasNeeded);
+ }
+ }
+
+ private int closeReplicasIfPossible(ContainerInfo container,
+ List<ContainerReplica> replicas) {
+ int numCloseCmdsSent = 0;
+ Iterator<ContainerReplica> iterator = replicas.iterator();
+ while (iterator.hasNext()) {
+ final ContainerReplica replica = iterator.next();
+ final State state = replica.getState();
+ if (state == State.OPEN || state == State.CLOSING) {
+ sendCloseCommand(container, replica.getDatanodeDetails(), false);
+ numCloseCmdsSent++;
+ iterator.remove();
+ } else if (state == State.QUASI_CLOSED) {
+ // Send force close command if the BCSID matches
+ if (container.getSequenceId() == replica.getSequenceId()) {
+ sendCloseCommand(container, replica.getDatanodeDetails(), true);
+ numCloseCmdsSent++;
+ iterator.remove();
+ }
+ }
+ }
+
+ return numCloseCmdsSent;
+ }
+
+ /* HELPER METHODS FOR ALL OVER AND UNDER REPLICATED CONTAINERS */
+
+ private void deleteExcess(ContainerInfo container,
+ List<ContainerReplica> deleteCandidates, int excess) {
+ // Replica which are maintenance or decommissioned are not eligible to
+ // be removed, as they do not count toward over-replication and they
+ // also may not be available
+ deleteCandidates.removeIf(r ->
+ r.getDatanodeDetails().getPersistedOpState() !=
+ NodeOperationalState.IN_SERVICE);
+
+ deleteCandidates.stream().limit(excess).forEach(r ->
+ sendDeleteCommand(container, r.getDatanodeDetails(), true));
+ }
+
+ /**
+ * remove execess replicas if needed, replicationFactor and placement policy
+ * will be take into consideration.
+ *
+ * @param excess the excess number after subtracting replicationFactor
+ * @param container ContainerInfo
+ * @param eligibleReplicas An list of replicas, which may have excess
replicas
+ */
+ private void deleteExcessWithTopology(int excess,
+ final ContainerInfo container,
+ final List<ContainerReplica> eligibleReplicas) {
+ // After removing all unhealthy replicas, if the container is still over
+ // replicated then we need to check if it is already mis-replicated.
+ // If it is, we do no harm by removing excess replicas. However, if it is
+ // not mis-replicated, then we can only remove replicas if they don't
+ // make the container become mis-replicated.
+ if (excess > 0) {
+ Set<ContainerReplica> eligibleSet = new HashSet<>(eligibleReplicas);
+ final int replicationFactor =
+ container.getReplicationConfig().getRequiredNodes();
+ ContainerPlacementStatus ps =
+ getPlacementStatus(eligibleSet, replicationFactor);
+
+ for (ContainerReplica r : eligibleReplicas) {
+ if (excess <= 0) {
+ break;
+ }
+ // First remove the replica we are working on from the set, and then
+ // check if the set is now mis-replicated.
+ eligibleSet.remove(r);
+ ContainerPlacementStatus nowPS =
+ getPlacementStatus(eligibleSet, replicationFactor);
+ if (isPlacementStatusActuallyEqual(ps, nowPS)) {
+ // Remove the replica if the container was already unsatisfied
+ // and losing this replica keep actual placement count unchanged.
+ // OR if losing this replica still keep satisfied
+ sendDeleteCommand(container, r.getDatanodeDetails(), true);
+ excess -= 1;
+ continue;
+ }
+ // If we decided not to remove this replica, put it back into the set
+ eligibleSet.add(r);
+ }
+ if (excess > 0) {
+ LOG.info("The container {} is over replicated with {} excess " +
+ "replica. The excess replicas cannot be removed without " +
+ "violating the placement policy", container, excess);
+ }
+ }
+ }
+
+ private void deleteExcessWithNonUniqueOriginNodeIDs(ContainerInfo container,
+ Set<UUID> existingOriginNodeIDs, List<ContainerReplica> deleteCandidates,
+ int excess) {
Review Comment:
I think this method won't always do what's expected, depending on the
situation of the caller. Take the case where
`handleOverReplicatedExcessUnhealthy()` calls this method. If I'm correct, the
expectation is that all unhealthy (here, replicas whose state is not same as
container state) replicas that don't have a unique origin node should be
deleted. Actually, this method will end up deleting them even if they have
unique origin nodes.
I modified an existing test to check this:
```
@Test
public void testQuasiClosedContainerWithUnhealthyReplica()
throws IOException, TimeoutException {
final ContainerInfo container =
getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replica4 = getReplicas(
id, UNHEALTHY, 1000L, randomDatanodeDetails().getUuid(),
randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replica4);
int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
// // All the QUASI_CLOSED replicas have same originNodeId, so the
// // container will not be closed. ReplicationManager should take no
action.
// replicationManager.processAll();
// eventQueue.processAll(1000);
// Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
// Make the first replica unhealthy
final ContainerReplica unhealthyReplica = getReplicas(
id, UNHEALTHY, 1000L, originNodeId,
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(
id, unhealthyReplica);
long currentBytesToReplicate = replicationManager.getMetrics()
.getNumReplicationBytesTotal();
replicationManager.processAll();
eventQueue.processAll(1000);
// Under replication handler should first re-replicate one of the quasi
// closed containers.
// The unhealthy container should not have been deleted in the first
pass.
assertDeleteScheduled(0);
currentReplicateCommandCount += 1;
currentBytesToReplicate += 100L;
Assertions.assertEquals(currentReplicateCommandCount,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate,
replicationManager.getMetrics().getNumReplicationBytesTotal());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
// We should have one under replicated and one quasi_closed_stuck
ReplicationManagerReport report =
replicationManager.getContainerReport();
Assertions.assertEquals(1,
report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
// Create the replica so replication manager sees it on the next run.
List<CommandForDatanode> replicateCommands = datanodeCommandHandler
.getReceivedCommands().stream()
.filter(c -> c.getCommand().getType()
.equals(SCMCommandProto.Type.replicateContainerCommand))
.collect(Collectors.toList());
for (CommandForDatanode replicateCommand: replicateCommands) {
DatanodeDetails newNode = createDatanodeDetails(
replicateCommand.getDatanodeId());
ContainerReplica newReplica = getReplicas(
id, QUASI_CLOSED, 1000L, originNodeId, newNode);
containerStateManager.updateContainerReplica(id, newReplica);
}
// On the next run, the unhealthy container should be scheduled for
// deletion, since the quasi closed container is now sufficiently
// replicated.
// This method runs an iteration of replication manager.
assertDeleteScheduled(2);
// TODO check unhealthy was the replica deleted.
// Replication should have finished on the previous iteration, leaving
// these numbers unchanged.
Assertions.assertEquals(currentReplicateCommandCount,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate,
replicationManager.getMetrics().getNumReplicationBytesTotal());
Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightReplication());
// Now we will delete the unhealthy replica.
containerStateManager.removeContainerReplica(id, unhealthyReplica);
// There should be no work left on the following runs.
// TODO this runs correctly but command handler still has two things
left.
// replicationManager.processAll();
// eventQueue.processAll(1000);
// Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
}
```
This modified test passes, but I think the expectation is that it should
fail?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]