siddhantsangwan commented on a change in pull request #2441:
URL: https://github.com/apache/ozone/pull/2441#discussion_r685988635



##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -224,86 +291,284 @@ private boolean initializeIteration() {
         withinThresholdUtilizedNodes.add(datanodeUsageInfo);
       }
     }
+    metrics.setDatanodesNumToBalance(new LongMetric(countDatanodesToBalance));
+    // TODO update dataSizeToBalanceGB metric with overLoadedBytes and
+    //  underLoadedBytes
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
+    }
+
+    LOG.info("Container Balancer has identified {} Over-Utilized and {} " +
+            "Under-Utilized Datanodes that need to be balanced.",
+        overUtilizedNodes.size(), underUtilizedNodes.size());
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
+    return true;
+  }
+
+  private IterationResult doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+    moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      IterationResult result = checkConditionsForBalancing();
+      if (result != null) {
+        LOG.info("Conditions for balancing failed. Exiting current " +
+            "iteration...");
+        return result;
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        if (moveContainer(source, moveSelection)) {
+          // consider move successful for now, and update selection criteria
+          potentialTargets = 
updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) 
{
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        IterationResult result = checkConditionsForBalancing();
+        if (result != null) {
+          LOG.info("Conditions for balancing failed. Exiting current " +
+              "iteration...");
+          return result;
+        }
+
+        ContainerMoveSelection moveSelection =
+            matchSourceWithTarget(source, potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          if (moveContainer(source, moveSelection)) {
+            // consider move successful for now, and update selection criteria
+            potentialTargets =
+                updateTargetsAndSelectionCriteria(potentialTargets,
+                    selectedTargets, moveSelection, source);
+          }
+        }
+      }
+    }
+
+    // check move results
+    for (Map.Entry<ContainerMoveSelection,
+            CompletableFuture<ReplicationManager.MoveResult>>
+        futureEntry : moveSelectionToFutureMap.entrySet()) {
+      ContainerMoveSelection moveSelection = futureEntry.getKey();
+      CompletableFuture<ReplicationManager.MoveResult> future =
+          futureEntry.getValue();
       try {
-        Thread.sleep(50);
+        ReplicationManager.MoveResult result = future.get(
+            config.getMoveTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        if (result == ReplicationManager.MoveResult.COMPLETED) {
+          metrics.incrementMovedContainersNum(1);
+          //TODO update metrics with size moved in this iteration
+        }
       } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+        LOG.warn("Container move for container {} was interrupted.",
+            moveSelection.getContainerID(), e);
+      } catch (ExecutionException e) {
+        LOG.warn("Container move for container {} completed exceptionally.",
+            moveSelection.getContainerID(), e);
+      } catch (TimeoutException e) {
+        LOG.warn("Container move for container {} timed out.",
+            moveSelection.getContainerID(), e);
       }
-      /////////////////////////////
+    }
+    return IterationResult.ITERATION_COMPLETED;
+  }
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
-        return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
-            " balanced.");
-      }
+  /**
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   *
+   * @param potentialTargets Collection of potential targets to move
+   *                         container to
+   * @return ContainerMoveSelection containing the selected target and 
container
+   */
+  private ContainerMoveSelection matchSourceWithTarget(
+      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets) {
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" 
+
+          " datanode {}", source.getUuidString());
+      return null;
     }
-    return true;
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
+   * Checks if limits maxDatanodesToInvolvePerIteration and
+   * maxSizeToMovePerIteration have not been hit.
    *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * @return {@link IterationResult#MAX_DATANODES_TO_INVOLVE_REACHED} if 
reached
+   * max datanodes to involve limit,
+   * {@link IterationResult#MAX_SIZE_TO_MOVE_REACHED} if reached max size to
+   * move limit, or null if balancing can continue
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private IterationResult checkConditionsForBalancing() {
+    if (countDatanodesInvolvedPerIteration + 2 >
+        maxDatanodesToInvolvePerIteration * totalNodesInCluster) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to