sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108764537


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original 
replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp 
containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());
+
+    final ContainerInfo containerInfo = containerManager.getContainer(cid);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+    if (healthResult.getHealthState() ==
+        ContainerHealthResult.HealthState.HEALTHY) {
+      sendDeleteCommand(containerInfo, src);
+    } else {
+      LOG.info("Cannot remove source replica as the container health would " +
+          " be {}", healthResult.getHealthState());
+      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+    }
+  }
+
+  private Set<ContainerReplica> createReplicaSetAfterMove(
+      ContainerInfo containerInfo, DatanodeDetails src,
+      DatanodeDetails tgt, Set<ContainerReplica> existing) {
+    Set<ContainerReplica> replicas = new HashSet<>(existing);
+    ContainerReplica srcReplica = null;
+    for (ContainerReplica r : replicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcReplica = r;
+        break;
+      }
+    }
+    if (srcReplica == null) {
+      throw new RuntimeException("The source replica is not present");
+    }
+    replicas.remove(srcReplica);
+    replicas.add(ContainerReplica.newBuilder()
+        .setDatanodeDetails(tgt)
+        .setContainerID(containerInfo.containerID())
+        .setContainerState(StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED)
+        .setOriginNodeId(srcReplica.getOriginDatanodeId())
+        .setSequenceId(srcReplica.getSequenceId())
+        .setBytesUsed(srcReplica.getBytesUsed())
+        .setKeyCount(srcReplica.getKeyCount())
+        .setReplicaIndex(srcReplica.getReplicaIndex())

Review Comment:
   Good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to