sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108882897


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java:
##########
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManager implements
+    ContainerReplicaPendingOpsSubscriber {
+
+  /**
+   * Various move return results.
+   */
+  public enum MoveResult {
+    // both replication and deletion are completed
+    COMPLETED,
+    // RM is not ratis leader
+    FAIL_LEADER_NOT_READY,
+    // replication fail because the container does not exist in src
+    REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
+    // replication fail because the container exists in target
+    REPLICATION_FAIL_EXIST_IN_TARGET,
+    // replication fail because the container is not cloesed
+    REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
+    // replication fail because the container is in inflightDeletion
+    REPLICATION_FAIL_INFLIGHT_DELETION,
+    // replication fail because the container is in inflightReplication
+    REPLICATION_FAIL_INFLIGHT_REPLICATION,
+    // replication fail because of timeout
+    REPLICATION_FAIL_TIME_OUT,
+    // replication fail because of node is not in service
+    REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication fail because node is unhealthy
+    REPLICATION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but deletion fail because of timeout
+    DELETION_FAIL_TIME_OUT,
+    // deletion fail because of node is not in service
+    DELETION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication succeed, but deletion fail because because
+    // node is unhealthy
+    DELETION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but if we delete the container from
+    // the source datanode , the policy(eg, replica num or
+    // rack location) will not be satisfied, so we should not delete
+    // the container
+    DELETE_FAIL_POLICY,
+    //  replicas + target - src does not satisfy placement policy
+    REPLICATION_NOT_HEALTHY,
+    //write DB error
+    FAIL_CAN_NOT_RECORD_TO_DB
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManager.class);
+
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+
+  private final Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> pendingMoves =
+      new ConcurrentHashMap<>();
+
+  private volatile boolean running = false;
+
+  public MoveManager(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager) {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> getPendingMove() {
+    return pendingMoves;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    Pair<CompletableFuture<MoveResult>, MoveDataNodePair> move =
+        pendingMoves.remove(cid);
+    if (move != null) {
+      CompletableFuture<MoveResult> future = move.getLeft();
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoves.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoves.putIfAbsent(containerInfo.containerID(), Pair.of(ret, mp));

Review Comment:
   yea makes sense. I had to change it a little to handle exceptions in the 
sendReplicateCommand, but the general ideal is the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to