This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa1e5ab3b HDDS-7998. Synchronize on containerInfo in 
ReplicationManager and MoveManager (#4295)
2fa1e5ab3b is described below

commit 2fa1e5ab3b105cf82ead0b5789ff827771bdd74b
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Feb 22 13:55:55 2023 +0000

    HDDS-7998. Synchronize on containerInfo in ReplicationManager and 
MoveManager (#4295)
---
 .../hdds/scm/container/balancer/MoveManager.java   | 173 +++++++++++----------
 .../container/replication/ReplicationManager.java  |  59 +++----
 .../replication/UnhealthyReplicationProcessor.java |  14 +-
 3 files changed, 128 insertions(+), 118 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
index a64c9ab42e..773382cde2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java
@@ -238,67 +238,69 @@ public final class MoveManager implements
 
     // Ensure the container exists on the src and is not present on the target
     ContainerInfo containerInfo = containerManager.getContainer(cid);
-    final Set<ContainerReplica> currentReplicas = containerManager
-        .getContainerReplicas(cid);
-
-    boolean srcExists = false;
-    for (ContainerReplica r : currentReplicas) {
-      if (r.getDatanodeDetails().equals(src)) {
-        srcExists = true;
+    synchronized (containerInfo) {
+      final Set<ContainerReplica> currentReplicas = containerManager
+          .getContainerReplicas(cid);
+
+      boolean srcExists = false;
+      for (ContainerReplica r : currentReplicas) {
+        if (r.getDatanodeDetails().equals(src)) {
+          srcExists = true;
+        }
+        if (r.getDatanodeDetails().equals(tgt)) {
+          ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+          return ret;
+        }
       }
-      if (r.getDatanodeDetails().equals(tgt)) {
-        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+      if (!srcExists) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
         return ret;
       }
-    }
-    if (!srcExists) {
-      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
-      return ret;
-    }
 
-    /*
-     * Ensure the container has no inflight actions.
-     * The reason why the given container should not be taking any inflight
-     * action is that: if the given container is being replicated or deleted,
-     * the num of its replica is not deterministic, so move operation issued
-     * by balancer may cause a nondeterministic result, so we should drop
-     * this option for this time.
-     */
-    List<ContainerReplicaOp> pendingOps =
-        replicationManager.getPendingReplicationOps(cid);
-    for (ContainerReplicaOp op : pendingOps) {
-      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
-        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
-        return ret;
-      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
-        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
-        return ret;
+      /*
+       * Ensure the container has no inflight actions.
+       * The reason why the given container should not be taking any inflight
+       * action is that: if the given container is being replicated or deleted,
+       * the num of its replica is not deterministic, so move operation issued
+       * by balancer may cause a nondeterministic result, so we should drop
+       * this option for this time.
+       */
+      List<ContainerReplicaOp> pendingOps =
+          replicationManager.getPendingReplicationOps(cid);
+      for (ContainerReplicaOp op : pendingOps) {
+        if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+          ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+          return ret;
+        } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+          ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+          return ret;
+        }
       }
-    }
 
-    // Ensure the container is CLOSED
-    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
-    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
-      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-      return ret;
-    }
+      // Ensure the container is CLOSED
+      HddsProtos.LifeCycleState currentContainerStat = 
containerInfo.getState();
+      if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+        ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+        return ret;
+      }
 
-    // Create a set or replicas that indicates how the container will look
-    // after the move and ensure it is healthy - ie not under, over or mis
-    // replicated.
-    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
-        src, tgt, currentReplicas);
-    ContainerHealthResult healthResult = replicationManager
-        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
-    if (healthResult.getHealthState()
-        != ContainerHealthResult.HealthState.HEALTHY) {
-      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      // Create a set or replicas that indicates how the container will look
+      // after the move and ensure it is healthy - ie not under, over or mis
+      // replicated.
+      Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+          src, tgt, currentReplicas);
+      ContainerHealthResult healthResult = replicationManager
+          .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+      if (healthResult.getHealthState()
+          != ContainerHealthResult.HealthState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+        return ret;
+      }
+      startMove(containerInfo, src, tgt, ret);
+      LOG.debug("Processed a move request for container {}, from {} to {}",
+          cid, src.getUuidString(), tgt.getUuidString());
       return ret;
     }
-    startMove(containerInfo, src, tgt, ret);
-    LOG.debug("Processed a move request for container {}, from {} to {}",
-        cid, src.getUuidString(), tgt.getUuidString());
-    return ret;
   }
 
   /**
@@ -371,41 +373,44 @@ public final class MoveManager implements
     }
     MoveDataNodePair movePair = pair.getRight();
     final DatanodeDetails src = movePair.getSrc();
-    Set<ContainerReplica> currentReplicas = containerManager
-        .getContainerReplicas(cid);
-
-    Set<ContainerReplica> futureReplicas = new HashSet<>(currentReplicas);
-    boolean found = futureReplicas.removeIf(
-        r -> r.getDatanodeDetails().equals(src));
-    if (!found) {
-      // if the target is present but source disappears somehow,
-      // we can consider move is successful.
-      completeMove(cid, MoveResult.COMPLETED);
-      return;
-    }
 
-    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
-    if (nodeStatus.getOperationalState()
-        != HddsProtos.NodeOperationalState.IN_SERVICE) {
-      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
-      return;
-    }
-    if (!nodeStatus.isHealthy()) {
-      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
-      return;
-    }
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    synchronized (containerInfo) {
+      Set<ContainerReplica> currentReplicas = containerManager
+          .getContainerReplicas(cid);
+
+      Set<ContainerReplica> futureReplicas = new HashSet<>(currentReplicas);
+      boolean found = futureReplicas.removeIf(
+          r -> r.getDatanodeDetails().equals(src));
+      if (!found) {
+        // if the target is present but source disappears somehow,
+        // we can consider move is successful.
+        completeMove(cid, MoveResult.COMPLETED);
+        return;
+      }
 
-    final ContainerInfo containerInfo = containerManager.getContainer(cid);
-    ContainerHealthResult healthResult = replicationManager
-        .getContainerReplicationHealth(containerInfo, futureReplicas);
+      final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+      if (nodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+        return;
+      }
+      if (!nodeStatus.isHealthy()) {
+        completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+        return;
+      }
 
-    if (healthResult.getHealthState() ==
-        ContainerHealthResult.HealthState.HEALTHY) {
-      sendDeleteCommand(containerInfo, src);
-    } else {
-      LOG.info("Cannot remove source replica as the container health would " +
-          "be {}", healthResult.getHealthState());
-      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+      ContainerHealthResult healthResult = replicationManager
+          .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+      if (healthResult.getHealthState() ==
+          ContainerHealthResult.HealthState.HEALTHY) {
+        sendDeleteCommand(containerInfo, src);
+      } else {
+        LOG.info("Cannot remove source replica as the container health would " 
+
+            "be {}", healthResult.getHealthState());
+        completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index a86a051bde..cfb6aa73b9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -661,34 +661,35 @@ public class ReplicationManager implements SCMService {
   protected void processContainer(ContainerInfo containerInfo,
       ReplicationQueue repQueue, ReplicationManagerReport report)
       throws ContainerNotFoundException {
-
-    ContainerID containerID = containerInfo.containerID();
-    Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
-        containerID);
-    List<ContainerReplicaOp> pendingOps =
-        containerReplicaPendingOps.getPendingOps(containerID);
-
-    // There is a different config for EC and Ratis maintenance
-    // minimum replicas, so we must pass through the correct one.
-    int maintRedundancy = maintenanceRedundancy;
-    if (containerInfo.getReplicationType() == RATIS) {
-      maintRedundancy = ratisMaintenanceMinReplicas;
-    }
-    ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
-        .setContainerInfo(containerInfo)
-        .setContainerReplicas(replicas)
-        .setMaintenanceRedundancy(maintRedundancy)
-        .setReport(report)
-        .setPendingOps(pendingOps)
-        .setReplicationQueue(repQueue)
-        .build();
-    // This will call the chain of container health handlers in turn which
-    // will issue commands as needed, update the report and perhaps add
-    // containers to the over and under replicated queue.
-    boolean handled = containerCheckChain.handleChain(checkRequest);
-    if (!handled) {
-      LOG.debug("Container {} had no actions after passing through the " +
-          "check chain", containerInfo.containerID());
+    synchronized (containerInfo) {
+      ContainerID containerID = containerInfo.containerID();
+      Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+          containerID);
+      List<ContainerReplicaOp> pendingOps =
+          containerReplicaPendingOps.getPendingOps(containerID);
+
+      // There is a different config for EC and Ratis maintenance
+      // minimum replicas, so we must pass through the correct one.
+      int maintRedundancy = maintenanceRedundancy;
+      if (containerInfo.getReplicationType() == RATIS) {
+        maintRedundancy = ratisMaintenanceMinReplicas;
+      }
+      ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
+          .setContainerInfo(containerInfo)
+          .setContainerReplicas(replicas)
+          .setMaintenanceRedundancy(maintRedundancy)
+          .setReport(report)
+          .setPendingOps(pendingOps)
+          .setReplicationQueue(repQueue)
+          .build();
+      // This will call the chain of container health handlers in turn which
+      // will issue commands as needed, update the report and perhaps add
+      // containers to the over and under replicated queue.
+      boolean handled = containerCheckChain.handleChain(checkRequest);
+      if (!handled) {
+        LOG.debug("Container {} had no actions after passing through the " +
+            "check chain", containerInfo.containerID());
+      }
     }
   }
 
@@ -1114,7 +1115,7 @@ public class ReplicationManager implements SCMService {
     return ReplicationManager.class.getSimpleName();
   }
 
-  public synchronized ReplicationManagerMetrics getMetrics() {
+  public ReplicationManagerMetrics getMetrics() {
     return metrics;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 15df85b6f7..372d70c5a7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,11 +124,14 @@ public abstract class 
UnhealthyReplicationProcessor<HealthResult extends
           throws IOException;
 
   private void processContainer(HealthResult healthResult) throws IOException {
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
-            replicationManager, healthResult);
-    for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
-      replicationManager.sendDatanodeCommand(cmd.getValue(),
-              healthResult.getContainerInfo(), cmd.getKey());
+    ContainerInfo containerInfo = healthResult.getContainerInfo();
+    synchronized (containerInfo) {
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
+          replicationManager, healthResult);
+      for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
+        replicationManager.sendDatanodeCommand(cmd.getValue(),
+            healthResult.getContainerInfo(), cmd.getKey());
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to