This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa1e5ab3b HDDS-7998. Synchronize on containerInfo in
ReplicationManager and MoveManager (#4295)
2fa1e5ab3b is described below
commit 2fa1e5ab3b105cf82ead0b5789ff827771bdd74b
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Feb 22 13:55:55 2023 +0000
HDDS-7998. Synchronize on containerInfo in ReplicationManager and
MoveManager (#4295)
---
.../hdds/scm/container/balancer/MoveManager.java | 173 +++++++++++----------
.../container/replication/ReplicationManager.java | 59 +++----
.../replication/UnhealthyReplicationProcessor.java | 14 +-
3 files changed, 128 insertions(+), 118 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
index a64c9ab42e..773382cde2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
@@ -238,67 +238,69 @@ public final class MoveManager implements
// Ensure the container exists on the src and is not present on the target
ContainerInfo containerInfo = containerManager.getContainer(cid);
- final Set<ContainerReplica> currentReplicas = containerManager
- .getContainerReplicas(cid);
-
- boolean srcExists = false;
- for (ContainerReplica r : currentReplicas) {
- if (r.getDatanodeDetails().equals(src)) {
- srcExists = true;
+ synchronized (containerInfo) {
+ final Set<ContainerReplica> currentReplicas = containerManager
+ .getContainerReplicas(cid);
+
+ boolean srcExists = false;
+ for (ContainerReplica r : currentReplicas) {
+ if (r.getDatanodeDetails().equals(src)) {
+ srcExists = true;
+ }
+ if (r.getDatanodeDetails().equals(tgt)) {
+ ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+ return ret;
+ }
}
- if (r.getDatanodeDetails().equals(tgt)) {
- ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+ if (!srcExists) {
+ ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
return ret;
}
- }
- if (!srcExists) {
- ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
- return ret;
- }
- /*
- * Ensure the container has no inflight actions.
- * The reason why the given container should not be taking any inflight
- * action is that: if the given container is being replicated or deleted,
- * the num of its replica is not deterministic, so move operation issued
- * by balancer may cause a nondeterministic result, so we should drop
- * this option for this time.
- */
- List<ContainerReplicaOp> pendingOps =
- replicationManager.getPendingReplicationOps(cid);
- for (ContainerReplicaOp op : pendingOps) {
- if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
- ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
- return ret;
- } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
- ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
- return ret;
+ /*
+ * Ensure the container has no inflight actions.
+ * The reason why the given container should not be taking any inflight
+ * action is that: if the given container is being replicated or deleted,
+ * the num of its replica is not deterministic, so move operation issued
+ * by balancer may cause a nondeterministic result, so we should drop
+ * this option for this time.
+ */
+ List<ContainerReplicaOp> pendingOps =
+ replicationManager.getPendingReplicationOps(cid);
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+ return ret;
+ } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+ return ret;
+ }
}
- }
- // Ensure the container is CLOSED
- HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
- if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
- ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
- return ret;
- }
+ // Ensure the container is CLOSED
+ HddsProtos.LifeCycleState currentContainerStat =
containerInfo.getState();
+ if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+ ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+ return ret;
+ }
- // Create a set or replicas that indicates how the container will look
- // after the move and ensure it is healthy - ie not under, over or mis
- // replicated.
- Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
- src, tgt, currentReplicas);
- ContainerHealthResult healthResult = replicationManager
- .getContainerReplicationHealth(containerInfo, replicasAfterMove);
- if (healthResult.getHealthState()
- != ContainerHealthResult.HealthState.HEALTHY) {
- ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+ // Create a set or replicas that indicates how the container will look
+ // after the move and ensure it is healthy - ie not under, over or mis
+ // replicated.
+ Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+ src, tgt, currentReplicas);
+ ContainerHealthResult healthResult = replicationManager
+ .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+ if (healthResult.getHealthState()
+ != ContainerHealthResult.HealthState.HEALTHY) {
+ ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+ return ret;
+ }
+ startMove(containerInfo, src, tgt, ret);
+ LOG.debug("Processed a move request for container {}, from {} to {}",
+ cid, src.getUuidString(), tgt.getUuidString());
return ret;
}
- startMove(containerInfo, src, tgt, ret);
- LOG.debug("Processed a move request for container {}, from {} to {}",
- cid, src.getUuidString(), tgt.getUuidString());
- return ret;
}
/**
@@ -371,41 +373,44 @@ public final class MoveManager implements
}
MoveDataNodePair movePair = pair.getRight();
final DatanodeDetails src = movePair.getSrc();
- Set<ContainerReplica> currentReplicas = containerManager
- .getContainerReplicas(cid);
-
- Set<ContainerReplica> futureReplicas = new HashSet<>(currentReplicas);
- boolean found = futureReplicas.removeIf(
- r -> r.getDatanodeDetails().equals(src));
- if (!found) {
- // if the target is present but source disappears somehow,
- // we can consider move is successful.
- completeMove(cid, MoveResult.COMPLETED);
- return;
- }
- final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
- if (nodeStatus.getOperationalState()
- != HddsProtos.NodeOperationalState.IN_SERVICE) {
- completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
- return;
- }
- if (!nodeStatus.isHealthy()) {
- completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
- return;
- }
+ ContainerInfo containerInfo = containerManager.getContainer(cid);
+ synchronized (containerInfo) {
+ Set<ContainerReplica> currentReplicas = containerManager
+ .getContainerReplicas(cid);
+
+ Set<ContainerReplica> futureReplicas = new HashSet<>(currentReplicas);
+ boolean found = futureReplicas.removeIf(
+ r -> r.getDatanodeDetails().equals(src));
+ if (!found) {
+ // if the target is present but source disappears somehow,
+ // we can consider move is successful.
+ completeMove(cid, MoveResult.COMPLETED);
+ return;
+ }
- final ContainerInfo containerInfo = containerManager.getContainer(cid);
- ContainerHealthResult healthResult = replicationManager
- .getContainerReplicationHealth(containerInfo, futureReplicas);
+ final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+ if (nodeStatus.getOperationalState()
+ != HddsProtos.NodeOperationalState.IN_SERVICE) {
+ completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+ return;
+ }
+ if (!nodeStatus.isHealthy()) {
+ completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+ return;
+ }
- if (healthResult.getHealthState() ==
- ContainerHealthResult.HealthState.HEALTHY) {
- sendDeleteCommand(containerInfo, src);
- } else {
- LOG.info("Cannot remove source replica as the container health would " +
- "be {}", healthResult.getHealthState());
- completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+ ContainerHealthResult healthResult = replicationManager
+ .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+ if (healthResult.getHealthState() ==
+ ContainerHealthResult.HealthState.HEALTHY) {
+ sendDeleteCommand(containerInfo, src);
+ } else {
+ LOG.info("Cannot remove source replica as the container health would "
+
+ "be {}", healthResult.getHealthState());
+ completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index a86a051bde..cfb6aa73b9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -661,34 +661,35 @@ public class ReplicationManager implements SCMService {
protected void processContainer(ContainerInfo containerInfo,
ReplicationQueue repQueue, ReplicationManagerReport report)
throws ContainerNotFoundException {
-
- ContainerID containerID = containerInfo.containerID();
- Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
- containerID);
- List<ContainerReplicaOp> pendingOps =
- containerReplicaPendingOps.getPendingOps(containerID);
-
- // There is a different config for EC and Ratis maintenance
- // minimum replicas, so we must pass through the correct one.
- int maintRedundancy = maintenanceRedundancy;
- if (containerInfo.getReplicationType() == RATIS) {
- maintRedundancy = ratisMaintenanceMinReplicas;
- }
- ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
- .setContainerInfo(containerInfo)
- .setContainerReplicas(replicas)
- .setMaintenanceRedundancy(maintRedundancy)
- .setReport(report)
- .setPendingOps(pendingOps)
- .setReplicationQueue(repQueue)
- .build();
- // This will call the chain of container health handlers in turn which
- // will issue commands as needed, update the report and perhaps add
- // containers to the over and under replicated queue.
- boolean handled = containerCheckChain.handleChain(checkRequest);
- if (!handled) {
- LOG.debug("Container {} had no actions after passing through the " +
- "check chain", containerInfo.containerID());
+ synchronized (containerInfo) {
+ ContainerID containerID = containerInfo.containerID();
+ Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+ containerID);
+ List<ContainerReplicaOp> pendingOps =
+ containerReplicaPendingOps.getPendingOps(containerID);
+
+ // There is a different config for EC and Ratis maintenance
+ // minimum replicas, so we must pass through the correct one.
+ int maintRedundancy = maintenanceRedundancy;
+ if (containerInfo.getReplicationType() == RATIS) {
+ maintRedundancy = ratisMaintenanceMinReplicas;
+ }
+ ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(replicas)
+ .setMaintenanceRedundancy(maintRedundancy)
+ .setReport(report)
+ .setPendingOps(pendingOps)
+ .setReplicationQueue(repQueue)
+ .build();
+ // This will call the chain of container health handlers in turn which
+ // will issue commands as needed, update the report and perhaps add
+ // containers to the over and under replicated queue.
+ boolean handled = containerCheckChain.handleChain(checkRequest);
+ if (!handled) {
+ LOG.debug("Container {} had no actions after passing through the " +
+ "check chain", containerInfo.containerID());
+ }
}
}
@@ -1114,7 +1115,7 @@ public class ReplicationManager implements SCMService {
return ReplicationManager.class.getSimpleName();
}
- public synchronized ReplicationManagerMetrics getMetrics() {
+ public ReplicationManagerMetrics getMetrics() {
return metrics;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 15df85b6f7..372d70c5a7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,11 +124,14 @@ public abstract class
UnhealthyReplicationProcessor<HealthResult extends
throws IOException;
private void processContainer(HealthResult healthResult) throws IOException {
- Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
- replicationManager, healthResult);
- for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
- replicationManager.sendDatanodeCommand(cmd.getValue(),
- healthResult.getContainerInfo(), cmd.getKey());
+ ContainerInfo containerInfo = healthResult.getContainerInfo();
+ synchronized (containerInfo) {
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
+ replicationManager, healthResult);
+ for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
+ replicationManager.sendDatanodeCommand(cmd.getValue(),
+ healthResult.getContainerInfo(), cmd.getKey());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]