JacksonYao287 commented on a change in pull request #2441:
URL: https://github.com/apache/ozone/pull/2441#discussion_r675438614



##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +

Review comment:
       better to log how many overUtilizedNodes and underUtilizedNodes also

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -200,6 +228,8 @@ private boolean initializeIteration() {
     // find over and under utilized nodes
     for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
       double utilization = calculateUtilization(datanodeUsageInfo);
+      LOG.info("Utilization for node {} is {}",
+          datanodeUsageInfo.getDatanodeDetails().getUuidString(), utilization);
       if (utilization > upperLimit) {
         overUtilizedNodes.add(datanodeUsageInfo);
         countDatanodesToBalance += 1;

Review comment:
       remove `countDatanodesToBalance` , `overLoadedBytes`, `underLoadedBytes` 
, no use here

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();
     try {
       if (!balancerRunning.compareAndSet(false, true)) {

Review comment:
       before starting balancer thread, we need to check whether the current 
scm is out of safe mode and is ratis leader?

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();

Review comment:
       seems we do not need a lock here. `balancerRunning.compareAndSet` can 
take the same effect for avoid current balancer thread

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      if (!checkConditionsForBalancing()) {
+        LOG.info("Conditions for balancing failed. Stopping Container " +
+            "Balancer...");
         return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
-            " balanced.");
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        // move container
+        // if move successful, do the following
+        potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+            selectedTargets, moveSelection, source);
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) 
{
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        if (!checkConditionsForBalancing()) {
+          LOG.info("Conditions for balancing failed. Stopping Container " +
+              "Balancer...");
+          return false;
+        }
+
+        ContainerMoveSelection moveSelection = matchSourceWithTarget(source,
+            potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          // move container
+          // if move successful, do the following
+          potentialTargets = 
updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
       }
     }
     return true;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
-   *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   * @param potentialTargets List of potential targets to move container to
+   * @return ContainerMoveSelection containing the selected target and 
container
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source,
+                                    List<DatanodeDetails> potentialTargets) {
+
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" 
+
+          " datanode {}", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
     }
 
-    if (comparator.compare(listToSearch.get(0),
-        listToSearch.get(size - 1)) < 0) {
-      index =
-          Collections.binarySearch(listToSearch, node, comparator.reversed());
-    } else {
-      index = Collections.binarySearch(listToSearch, node, comparator);
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
+  }
+
+  /**
+   * Checks if limits maxDatanodesToBalance and maxSizeToMove have not been 
hit.
+   * @return true if conditions pass and balancing can continue, else false
+   */
+  private boolean checkConditionsForBalancing() {

Review comment:
       maybe. we also. need to check is scm Leader now?

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -113,49 +130,50 @@ public boolean start(
     lock.lock();
     try {
       if (!balancerRunning.compareAndSet(false, true)) {
-        LOG.info("Container Balancer is already running.");
+        LOG.error("Container Balancer is already running.");
         return false;
       }
-      
+
+      ozoneConfiguration = new OzoneConfiguration();
       this.config = balancerConfiguration;
-      this.idleIteration = config.getIdleIteration();
-      this.threshold = config.getThreshold();
-      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
-      this.maxSizeToMoveInGB = config.getMaxSizeToMove();
-      this.unBalancedNodes = new ArrayList<>();
       LOG.info("Starting Container Balancer...{}", this);
+
       //we should start a new balancer thread async
       //and response to cli as soon as possible
 
 
       //TODO: this is a temporary implementation
       //modify this later
-      currentBalancingThread = new Thread(() -> balance());
+      currentBalancingThread = new Thread(this::balance);
       currentBalancingThread.start();
       ////////////////////////
     } finally {
       lock.unlock();
     }
 
-
     return true;
   }
 
   /**
    * Balances the cluster.
    */
   private void balance() {
-    for (int i = 0; i < idleIteration; i++) {
-      if (!initializeIteration()) {
+    countDatanodesBalanced = 0;
+    totalSizeMoved = 0;
+    int i = 0;
+    do {
+      this.idleIteration = config.getIdleIteration();
+      this.threshold = config.getThreshold();
+      this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+      this.maxSizeToMove = config.getMaxSizeToMove();

Review comment:
       move these code out of the loop, and `for` is preferred for loop

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -180,6 +198,16 @@ private boolean initializeIteration() {
       return false;
     }
 
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.selectedContainers = new HashSet<>();
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();

Review comment:
       remove this

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -84,25 +104,22 @@ public ContainerBalancer(
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
       OzoneConfiguration ozoneConfiguration,
-      final SCMContext scmContext) {
+      final SCMContext scmContext,
+      PlacementPolicy placementPolicy) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
     this.config = new ContainerBalancerConfiguration();
     this.metrics = new ContainerBalancerMetrics();
     this.scmContext = scmContext;
+    this.placementPolicy = placementPolicy;
 
-    this.clusterCapacity = 0L;
-    this.clusterUsed = 0L;
-    this.clusterRemaining = 0L;

Review comment:
       remove `clusterRemaining ` in  current code , seems not used

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      if (!checkConditionsForBalancing()) {
+        LOG.info("Conditions for balancing failed. Stopping Container " +
+            "Balancer...");
         return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
-            " balanced.");
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        // move container
+        // if move successful, do the following
+        potentialTargets = updateTargetsAndSelectionCriteria(potentialTargets,
+            selectedTargets, moveSelection, source);
+      }
+    }
+
+    // if not all underUtilized nodes have been selected, try to match
+    // withinThresholdUtilized nodes with underUtilized nodes
+    if (selectedTargets.size() < underUtilizedNodes.size()) {
+      potentialTargets.removeAll(selectedTargets);
+      Collections.reverse(withinThresholdUtilizedNodes);
+
+      for (DatanodeUsageInfo datanodeUsageInfo : withinThresholdUtilizedNodes) 
{
+        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+        if (!checkConditionsForBalancing()) {
+          LOG.info("Conditions for balancing failed. Stopping Container " +
+              "Balancer...");
+          return false;
+        }
+
+        ContainerMoveSelection moveSelection = matchSourceWithTarget(source,
+            potentialTargets);
+
+        if (moveSelection != null) {
+          LOG.info("ContainerBalancer is trying to move container {} from " +
+                  "source datanode {} to target datanode {}",
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
+              moveSelection.getTargetNode().getUuidString());
+
+          // move container
+          // if move successful, do the following
+          potentialTargets = 
updateTargetsAndSelectionCriteria(potentialTargets,
+              selectedTargets, moveSelection, source);
+        }
       }
     }
     return true;
   }
 
   /**
-   * Performs binary search to determine if the specified listToSearch
-   * contains the specified node.
-   *
-   * @param listToSearch List of DatanodeUsageInfo to be searched.
-   * @param node DatanodeUsageInfo to be searched for.
-   * @return true if the specified node is present in listToSearch, otherwise
-   * false.
+   * Match a source datanode with a target datanode and identify the container
+   * to move.
+   * @param potentialTargets List of potential targets to move container to
+   * @return ContainerMoveSelection containing the selected target and 
container
    */
-  private boolean containsNode(
-      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
-    int index = 0;
-    Comparator<DatanodeUsageInfo> comparator =
-        DatanodeUsageInfo.getMostUsedByRemainingRatio();
-    int size = listToSearch.size();
-    if (size == 0) {
-      return false;
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source,
+                                    List<DatanodeDetails> potentialTargets) {
+
+    NavigableSet<ContainerID> candidateContainers =
+        selectionCriteria.getCandidateContainers(source);
+
+    if (candidateContainers.isEmpty()) {
+      LOG.info("ContainerBalancer could not find any candidate containers for" 
+
+          " datanode {}", source.getUuidString());
+      return null;
+    }
+    LOG.info("ContainerBalancer is finding suitable target for source " +
+        "datanode {}", source.getUuidString());
+    ContainerMoveSelection moveSelection =
+        findTargetStrategy.findTargetForContainerMove(
+            source, potentialTargets, candidateContainers,
+            this::canSizeEnterTarget);
+
+    if (moveSelection == null) {
+      LOG.info("ContainerBalancer could not find a suitable target for " +
+          "source node {}.", source.getUuidString());
+      return null;
     }
 
-    if (comparator.compare(listToSearch.get(0),
-        listToSearch.get(size - 1)) < 0) {
-      index =
-          Collections.binarySearch(listToSearch, node, comparator.reversed());
-    } else {
-      index = Collections.binarySearch(listToSearch, node, comparator);
+    LOG.info("ContainerBalancer matched source datanode {} with target " +
+            "datanode {} for container move.", source.getUuidString(),
+        moveSelection.getTargetNode().getUuidString());
+
+    return moveSelection;
+  }
+
+  /**
+   * Checks if limits maxDatanodesToBalance and maxSizeToMove have not been 
hit.
+   * @return true if conditions pass and balancing can continue, else false
+   */
+  private boolean checkConditionsForBalancing() {
+    if (countDatanodesBalanced + 2 > maxDatanodesToBalance) {
+      LOG.info("Hit max datanodes to balance limit. {} datanodes have already" 
+
+              " been balanced and the limit is {}.", countDatanodesBalanced,
+          maxDatanodesToBalance);
+      return false;
     }
-    return index >= 0 && listToSearch.get(index).equals(node);
+    if (totalSizeMoved + (long) ozoneConfiguration.getStorageSize(

Review comment:
       suppose totalSizeMoved = 97G  ,maxSizeToMove = 100G  , and we have a 
closed 2G ( < default 5G) container replica, can we move it?

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+
+    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
+    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
+        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
 
-      if (unBalancedNodes.isEmpty()) {
-        LOG.info("Did not find any unbalanced Datanodes.");
+    return true;
+  }
+
+  private boolean doIteration() {
+    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    Set<DatanodeDetails> selectedTargets =
+        new HashSet<>(potentialTargets.size());
+
+    // match each overUtilized node with a target
+    for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+      if (!checkConditionsForBalancing()) {
+        LOG.info("Conditions for balancing failed. Stopping Container " +
+            "Balancer...");
         return false;
-      } else {
-        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
-            " balanced.");
+      }
+
+      ContainerMoveSelection moveSelection =
+          matchSourceWithTarget(source, potentialTargets);
+
+      if (moveSelection != null) {
+        LOG.info("ContainerBalancer is trying to move container {} from " +
+                "source datanode {} to target datanode {}",
+            moveSelection.getContainerID().toString(), source.getUuidString(),
+            moveSelection.getTargetNode().getUuidString());
+
+        // move container

Review comment:
       we should call replicationManager#move here??

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -226,78 +256,188 @@ private boolean initializeIteration() {
     }
     Collections.reverse(underUtilizedNodes);
 
-    long countDatanodesBalanced = 0;
-    // count number of nodes that were balanced in previous iteration
-    for (DatanodeUsageInfo node : unBalancedNodes) {
-      if (!containsNode(overUtilizedNodes, node) &&
-          !containsNode(underUtilizedNodes, node)) {
-        countDatanodesBalanced += 1;
-      }
-    }
-    // calculate total number of nodes that have been balanced so far
-    countDatanodesBalanced =
-        metrics.incrementDatanodesNumBalanced(countDatanodesBalanced);
-
     unBalancedNodes = new ArrayList<>(
         overUtilizedNodes.size() + underUtilizedNodes.size());
+    unBalancedNodes.addAll(overUtilizedNodes);
+    unBalancedNodes.addAll(underUtilizedNodes);
 
-    if (countDatanodesBalanced + countDatanodesToBalance >
-        maxDatanodesToBalance) {
-      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
-          "Balancer. Stopping Balancer.");
+    if (unBalancedNodes.isEmpty()) {
+      LOG.info("Did not find any unbalanced Datanodes.");
       return false;
-    } else {
-      unBalancedNodes.addAll(overUtilizedNodes);
-      unBalancedNodes.addAll(underUtilizedNodes);
-
-      //for now, we just sleep to simulate the execution of balancer
-      //this if for acceptance test now. modify this later when balancer
-      //if fully completed
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      /////////////////////////////
+    }
+
+    LOG.info("Container Balancer has identified Datanodes that need to be" +
+        " balanced.");
+
+    selectionCriteria = new ContainerBalancerSelectionCriteria(config,
+        nodeManager, replicationManager, containerManager);
+    sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
+        withinThresholdUtilizedNodes.size());
+
+    // initialize maps to track how much size is leaving and entering datanodes
+    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +

Review comment:
       may be we can clear the HashMap for the sake of reuse , not to create 
new one everytime




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to