This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 79be6bf78f HDDS-9019. ECUnderReplicationHandler rejects a potential
target if it doesn't satisfy placement policy (#5080)
79be6bf78f is described below
commit 79be6bf78f752788b4012c33822ea1097e3ff9ad
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Tue Jul 18 16:12:08 2023 +0530
HDDS-9019. ECUnderReplicationHandler rejects a potential target if it
doesn't satisfy placement policy (#5080)
---
.../replication/ECUnderReplicationHandler.java | 136 +++++++++++++--------
.../replication/TestECUnderReplicationHandler.java | 8 +-
2 files changed, 92 insertions(+), 52 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 2afd3c53db..daae24f7f2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -76,20 +77,16 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
this.metrics = replicationManager.getMetrics();
}
- private boolean validatePlacement(List<DatanodeDetails> replicaNodes,
- List<DatanodeDetails> selectedNodes) {
+ private ContainerPlacementStatus validatePlacement(
+ ContainerInfo container,
+ List<DatanodeDetails> replicaNodes,
+ List<DatanodeDetails> selectedNodes) {
+ LOG.trace("Validating placement policy for container {}. Available " +
+ "replica nodes: {}. Selected target nodes: {}.",
+ container.containerID(), replicaNodes, selectedNodes);
List<DatanodeDetails> nodes = new ArrayList<>(replicaNodes);
nodes.addAll(selectedNodes);
- boolean placementStatus = containerPlacement
- .validateContainerPlacement(nodes, nodes.size())
- .isPolicySatisfied();
- if (!placementStatus) {
- LOG.warn("Selected Nodes does not satisfy placement policy: {}. " +
- "Selected nodes: {}. Existing Replica Nodes: {}.",
- containerPlacement.getClass().getName(),
- selectedNodes, replicaNodes);
- }
- return placementStatus;
+ return containerPlacement.validateContainerPlacement(nodes, nodes.size());
}
/**
@@ -111,7 +108,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
final ContainerHealthResult result,
final int remainingMaintenanceRedundancy) throws IOException {
ContainerInfo container = result.getContainerInfo();
- LOG.debug("Handling under-replicated EC container: {}", container);
+ LOG.debug("Handling under-replicated container: {}", container);
final ECContainerReplicaCount replicaCount =
new ECContainerReplicaCount(container, replicas, pendingOps,
@@ -127,6 +124,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
container.getContainerID(), replicaCount.getReplicas());
return 0;
}
+ LOG.debug("Under-replicated container {} currently has replicas: {}.",
+ container.containerID(), replicas);
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
ReplicationManagerUtil.getExcludedAndUsedNodes(
@@ -282,6 +281,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
+ LOG.debug("Processing missing indexes {} for container {}.",
missingIndexes,
+ container.containerID());
final int expectedTargetCount = missingIndexes.size();
boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity();
if (expectedTargetCount == 0) {
@@ -334,8 +335,19 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
if (targetCount < expectedTargetCount) {
missingIndexes.subList(targetCount, expectedTargetCount).clear();
}
- if (0 < targetCount &&
- validatePlacement(availableSourceNodes, selectedDatanodes)) {
+
+ ContainerPlacementStatus placementStatusWithSelectedTargets =
+ validatePlacement(container, availableSourceNodes,
selectedDatanodes);
+ if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
+ LOG.debug("Target nodes + existing nodes for EC container {}" +
+ " will not satisfy placement policy {}. Reason: {}. Selected" +
+ " nodes: {}. Available source nodes: {}. Resuming " +
+ "reconstruction regardless.",
+ container.containerID(), containerPlacement.getClass().getName(),
+ placementStatusWithSelectedTargets.misReplicatedReason(),
+ selectedDatanodes, availableSourceNodes);
+ }
+ if (0 < targetCount) {
usedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
// sources?
@@ -387,6 +399,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
+ " {}. Available sources are: {}", container.containerID(),
repConfig.getData(), sources.size(), sources);
}
+ LOG.trace("Sent {} commands for container {}.", commandsSent,
+ container.containerID());
return commandsSent;
}
@@ -416,47 +430,60 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
int commandsSent = 0;
if (decomIndexes.size() > 0) {
+ LOG.debug("Processing decommissioning indexes {} for container {}.",
+ decomIndexes, container.containerID());
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
container, decomIndexes.size(), usedNodes, excludedNodes);
- if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
- usedNodes.addAll(selectedDatanodes);
- Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
- // In this case we need to do one to one copy.
- CommandTargetOverloadedException overloadedException = null;
- for (Integer decomIndex : decomIndexes) {
- Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
- if (source == null) {
- LOG.warn("Cannot find source replica for decommissioning index " +
- "{} in container {}", decomIndex, container.containerID());
- continue;
- }
- ContainerReplica sourceReplica = source.getLeft();
- if (!iterator.hasNext()) {
- LOG.warn("Couldn't find enough targets. Available source"
- + " nodes: {}, the target nodes: {}, excluded nodes: {},"
- + " usedNodes: {}, and the decommission indexes: {}",
- sources.values().stream()
- .map(Pair::getLeft).collect(Collectors.toSet()),
- selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
- break;
- }
- try {
- createReplicateCommand(
- container, iterator, sourceReplica, replicaCount);
- commandsSent++;
- } catch (CommandTargetOverloadedException e) {
- LOG.debug("Unable to send Replicate command for container {}" +
- " index {} because the source node {} is overloaded.",
- container.getContainerID(), sourceReplica.getReplicaIndex(),
- sourceReplica.getDatanodeDetails());
- overloadedException = e;
- }
+ ContainerPlacementStatus placementStatusWithSelectedTargets =
+ validatePlacement(container, availableSourceNodes,
selectedDatanodes);
+ if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
+ LOG.debug("Target nodes + existing nodes for EC container {}" +
+ " will not satisfy placement policy {}. Reason: {}. Selected" +
+ " nodes: {}. Available source nodes: {}. Resuming recovery " +
+ "regardless.",
+ container.containerID(), containerPlacement.getClass().getName(),
+ placementStatusWithSelectedTargets.misReplicatedReason(),
+ selectedDatanodes, availableSourceNodes);
+ }
+
+ usedNodes.addAll(selectedDatanodes);
+ Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
+ // In this case we need to do one to one copy.
+ CommandTargetOverloadedException overloadedException = null;
+ for (Integer decomIndex : decomIndexes) {
+ Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
+ if (source == null) {
+ LOG.warn("Cannot find source replica for decommissioning index " +
+ "{} in container {}", decomIndex, container.containerID());
+ continue;
}
- if (overloadedException != null) {
- throw overloadedException;
+ ContainerReplica sourceReplica = source.getLeft();
+ if (!iterator.hasNext()) {
+ LOG.warn("Couldn't find enough targets. Available source"
+ + " nodes: {}, the target nodes: {}, excluded nodes: {},"
+ + " usedNodes: {}, and the decommission indexes: {}",
+ sources.values().stream()
+ .map(Pair::getLeft).collect(Collectors.toSet()),
+ selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
+ break;
}
+ try {
+ createReplicateCommand(
+ container, iterator, sourceReplica, replicaCount);
+ commandsSent++;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send Replicate command for container {}" +
+ " index {} because the source node {} is overloaded.",
+ container.getContainerID(), sourceReplica.getReplicaIndex(),
+ sourceReplica.getDatanodeDetails());
+ overloadedException = e;
+ }
+ }
+ if (overloadedException != null) {
+ throw overloadedException;
}
+
if (selectedDatanodes.size() != decomIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy"
+
" to fully replicate the decommission indexes for container {}." +
@@ -467,6 +494,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
selectedDatanodes.size());
}
}
+ LOG.trace("Sent {} commands for container {}.", commandsSent,
+ container.containerID());
return commandsSent;
}
@@ -489,12 +518,17 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
ContainerInfo container = replicaCount.getContainer();
+ LOG.debug("Processing maintenance indexes {} for container {}.",
+ maintIndexes, container.containerID());
// this many maintenance replicas need another copy
int additionalMaintenanceCopiesNeeded =
replicaCount.additionalMaintenanceCopiesNeeded(true);
if (additionalMaintenanceCopiesNeeded == 0) {
return 0;
}
+ LOG.debug("Number of maintenance replicas of container {} that need " +
+ "additional copies: {}.", container.containerID(),
+ additionalMaintenanceCopiesNeeded);
List<DatanodeDetails> targets = getTargetDatanodes(
container, maintIndexes.size(), usedNodes, excludedNodes
);
@@ -551,6 +585,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
throw new InsufficientDatanodesException(maintIndexes.size(),
targets.size());
}
+ LOG.trace("Sent {} commands for container {}.", commandsSent,
+ container.containerID());
return commandsSent;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index be95bcee60..e65485709d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -481,6 +481,10 @@ public class TestECUnderReplicationHandler {
availableReplicas, 1, 2, policy);
}
+ /**
+ * The expectation is that an under replicated container should recover
+ * even if datanodes hosting new replicas don't satisfy placement policy.
+ */
@Test
public void testUnderReplicationWithInvalidPlacement()
throws IOException {
@@ -505,8 +509,8 @@ public class TestECUnderReplicationHandler {
.allMatch(dns::contains));
return mockedContainerPlacementStatus;
});
- testUnderReplicationWithMissingIndexes(emptyList(),
- availableReplicas, 0, 0, mockedPolicy);
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+ availableReplicas, 2, 0, mockedPolicy);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]