GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r637873957



##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;
+
+  @Metric(about = "Number of datanodes that Container Balancer has balanced " +
+      "until now.")
+  private LongMetric numDatanodesBalanced;

Review comment:
       NIT: `numDatanodesBalanced` to `datanodeNumBalanced`

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;

Review comment:
       NIT: `totalNumDatanodesToBalance` to `datanodeNumToBalance`

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;

Review comment:
       NIT: `numContainersMoved` to `movedContainerNum`

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;

Review comment:
       NIT `totalSizeToBalanceGB` -> `dataSizeToBalanceGB`

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;

Review comment:
       NIT: `gigaBytesMoved` to `dataSizeBalancedGB`

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe 
Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", 
clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {
+      LOG.warn("Division by zero while calculating average utilization of the" 
+
+          " cluster in ContainerBalancer.");
+      throw e;
+    }
+  }
+
+  /**
+   * Calculates the utilization, that is used space divided by capacity, for
+   * the given datanodeUsageInfo.
+   *
+   * @param datanodeUsageInfo DatanodeUsageInfo to calculate utilization for
+   * @return Utilization value
+   */
+  private double calculateUtilization(DatanodeUsageInfo datanodeUsageInfo) {
+    SCMNodeStat stat = datanodeUsageInfo.getScmNodeStat();
+
+    return stat.getScmUsed().get().doubleValue() /
+        stat.getCapacity().get().doubleValue();
   }
 
+  /**
+   * Stops ContainerBalancer.
+   */
   public void stop() {
     LOG.info("Stopping Container Balancer...");
-    balancerRunning = false;
+    balancerRunning.set(false);

Review comment:
       NIT. remove line 319, one info line is sufficient here, since no actual 
work needs to be done here.

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe 
Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", 
clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {
+      LOG.warn("Division by zero while calculating average utilization of the" 
+
+          " cluster in ContainerBalancer.");
+      throw e;
+    }
+  }
+
+  /**
+   * Calculates the utilization, that is used space divided by capacity, for
+   * the given datanodeUsageInfo.
+   *
+   * @param datanodeUsageInfo DatanodeUsageInfo to calculate utilization for
+   * @return Utilization value
+   */
+  private double calculateUtilization(DatanodeUsageInfo datanodeUsageInfo) {

Review comment:
       NIT. make it to be a static helper function.

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -40,96 +44,345 @@
   private ContainerManagerV2 containerManager;
   private ReplicationManager replicationManager;
   private OzoneConfiguration ozoneConfiguration;
+  private final SCMContext scmContext;
   private double threshold;
   private int maxDatanodesToBalance;
   private long maxSizeToMove;
-  private boolean balancerRunning;
-  private List<DatanodeUsageInfo> sourceNodes;
-  private List<DatanodeUsageInfo> targetNodes;
+  private List<DatanodeUsageInfo> unBalancedNodes;
+  private List<DatanodeUsageInfo> overUtilizedNodes;
+  private List<DatanodeUsageInfo> underUtilizedNodes;
+  private List<DatanodeUsageInfo> withinThresholdUtilizedNodes;
   private ContainerBalancerConfiguration config;
+  private ContainerBalancerMetrics metrics;
+  private long clusterCapacity;
+  private long clusterUsed;
+  private long clusterRemaining;
+  private double clusterAvgUtilisation;
+  private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
 
+  /**
+   * Constructs ContainerBalancer with the specified arguments. Initializes
+   * new ContainerBalancerConfiguration and ContainerBalancerMetrics.
+   * Container Balancer does not start on construction.
+   *
+   * @param nodeManager NodeManager
+   * @param containerManager ContainerManager
+   * @param replicationManager ReplicationManager
+   * @param ozoneConfiguration OzoneConfiguration
+   */
   public ContainerBalancer(
       NodeManager nodeManager,
       ContainerManagerV2 containerManager,
       ReplicationManager replicationManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      final SCMContext scmContext) {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.replicationManager = replicationManager;
     this.ozoneConfiguration = ozoneConfiguration;
-    this.balancerRunning = false;
     this.config = new ContainerBalancerConfiguration();
+    this.metrics = new ContainerBalancerMetrics();
+    this.scmContext = scmContext;
+
+    this.clusterCapacity = 0L;
+    this.clusterUsed = 0L;
+    this.clusterRemaining = 0L;
+
+    this.overUtilizedNodes = new ArrayList<>();
+    this.underUtilizedNodes = new ArrayList<>();
+    this.unBalancedNodes = new ArrayList<>();
+    this.withinThresholdUtilizedNodes = new ArrayList<>();
   }
 
   /**
-   * Start ContainerBalancer. Current implementation is incomplete.
+   * Starts ContainerBalancer. Current implementation is incomplete.
    *
    * @param balancerConfiguration Configuration values.
    */
-  public void start(ContainerBalancerConfiguration balancerConfiguration) {
-    this.balancerRunning = true;
+  public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
+    if (!balancerRunning.compareAndSet(false, true)) {
+      LOG.error("Container Balancer is already running.");
+      return false;
+    }
 
     ozoneConfiguration = new OzoneConfiguration();
-
-    // initialise configs
     this.config = balancerConfiguration;
     this.threshold = config.getThreshold();
-    this.maxDatanodesToBalance =
-        config.getMaxDatanodesToBalance();
+    this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
     this.maxSizeToMove = config.getMaxSizeToMove();
+    this.unBalancedNodes = new ArrayList<>();
+
+    LOG.info("Starting Container Balancer...{}", this);
+    balance();
+    return true;
+  }
 
-    LOG.info("Starting Container Balancer...");
+  /**
+   * Balances the cluster.
+   */
+  private void balance() {
+    initializeIteration();
 
+    // unBalancedNodes is not cleared since the next iteration uses this
+    // iteration's unBalancedNodes to find out how many nodes were balanced
+    overUtilizedNodes.clear();
+    underUtilizedNodes.clear();
+    withinThresholdUtilizedNodes.clear();
+  }
+
+  /**
+   * Initializes an iteration during balancing. Recognizes over, under, and
+   * within threshold utilized nodes. Decides whether balancing needs to
+   * continue or should be stopped.
+   *
+   * @return true if successfully initialized, otherwise false.
+   */
+  private boolean initializeIteration() {
+    if (scmContext.isInSafeMode()) {
+      LOG.error("Container Balancer cannot operate while SCM is in Safe 
Mode.");
+      return false;
+    }
     // sorted list in order from most to least used
-    List<DatanodeUsageInfo> nodes = nodeManager.
-        getMostOrLeastUsedDatanodes(true);
-    double avgUtilisation = calculateAvgUtilisation(nodes);
+    List<DatanodeUsageInfo> datanodeUsageInfos =
+        nodeManager.getMostOrLeastUsedDatanodes(true);
+    if (datanodeUsageInfos.isEmpty()) {
+      LOG.info("Container Balancer could not retrieve nodes from Node " +
+          "Manager.");
+      stop();
+      return false;
+    }
+
+    clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos);
+    LOG.info("Average utilization of the cluster is {}", 
clusterAvgUtilisation);
 
     // under utilized nodes have utilization(that is, used / capacity) less
     // than lower limit
-    double lowerLimit = avgUtilisation - threshold;
+    double lowerLimit = clusterAvgUtilisation - threshold;
 
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
-    double upperLimit = avgUtilisation + threshold;
-    LOG.info("Lower limit for utilization is {}", lowerLimit);
-    LOG.info("Upper limit for utilization is {}", upperLimit);
-
-    // find over utilised(source) and under utilised(target) nodes
-    sourceNodes = new ArrayList<>();
-    targetNodes = new ArrayList<>();
-//    for (DatanodeUsageInfo node : nodes) {
-//      SCMNodeStat stat = node.getScmNodeStat();
-//      double utilization = stat.getScmUsed().get().doubleValue() /
-//          stat.getCapacity().get().doubleValue();
-//      if (utilization > upperLimit) {
-//        sourceNodes.add(node);
-//      } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-//        targetNodes.add(node);
-//      }
-//    }
-  }
-
-  // calculate the average datanode utilisation across the cluster
-  private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+    double upperLimit = clusterAvgUtilisation + threshold;
+
+    LOG.info("Lower limit for utilization is {} and Upper limit for " +
+        "utilization is {}", lowerLimit, upperLimit);
+
+    long numDatanodesToBalance = 0L;
+    double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+    // find over and under utilized nodes
+    for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
+      double utilization = calculateUtilization(datanodeUsageInfo);
+      if (utilization > upperLimit) {
+        overUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes greater than upper limit in this node
+        overLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            upperLimit);
+      } else if (utilization < lowerLimit) {
+        underUtilizedNodes.add(datanodeUsageInfo);
+        numDatanodesToBalance += 1;
+
+        // amount of bytes lesser than lower limit in this node
+        underLoadedBytes += ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            lowerLimit) - ratioToBytes(
+            datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
+            utilization);
+      } else {
+        withinThresholdUtilizedNodes.add(datanodeUsageInfo);
+      }
+    }
+    Collections.reverse(underUtilizedNodes);
+
+    long numDatanodesBalanced = 0;
+    // count number of nodes that were balanced in previous iteration
+    for (DatanodeUsageInfo node : unBalancedNodes) {
+      if (!containsNode(overUtilizedNodes, node) &&
+          !containsNode(underUtilizedNodes, node)) {
+        numDatanodesBalanced += 1;
+      }
+    }
+    // calculate total number of nodes that have been balanced so far
+    numDatanodesBalanced =
+        metrics.incrementNumDatanodesBalanced(numDatanodesBalanced);
+
+    unBalancedNodes = new ArrayList<>(
+        overUtilizedNodes.size() + underUtilizedNodes.size());
+
+    if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+      LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+          "Balancer. Stopping Balancer.");
+      stop();
+      return false;
+    } else {
+      unBalancedNodes.addAll(overUtilizedNodes);
+      unBalancedNodes.addAll(underUtilizedNodes);
+
+      if (unBalancedNodes.isEmpty()) {
+        LOG.info("Did not find any unbalanced Datanodes.");
+        stop();
+        return false;
+      } else {
+        LOG.info("Container Balancer has identified Datanodes that need to be" 
+
+            " balanced.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Performs binary search to determine if the specified listToSearch
+   * contains the specified node.
+   *
+   * @param listToSearch List of DatanodeUsageInfo to be searched.
+   * @param node DatanodeUsageInfo to be searched for.
+   * @return true if the specified node is present in listToSearch, otherwise
+   * false.
+   */
+  private boolean containsNode(
+      List<DatanodeUsageInfo> listToSearch, DatanodeUsageInfo node) {
+    int index = 0;
+    Comparator<DatanodeUsageInfo> comparator =
+        DatanodeUsageInfo.getMostUsedByRemainingRatio();
+
+    if (comparator.compare(listToSearch.get(0),
+        listToSearch.get(listToSearch.size() - 1)) < 0) {
+      index =
+          Collections.binarySearch(listToSearch, node, comparator.reversed());
+    } else {
+      index = Collections.binarySearch(listToSearch, node, comparator);
+    }
+    return index >= 0 && listToSearch.get(index).equals(node);
+  }
+
+  /**
+   * Calculates the number of used bytes given capacity and utilization ratio.
+   *
+   * @param nodeCapacity capacity of the node.
+   * @param utilizationRatio used space by capacity ratio of the node.
+   * @return number of bytes
+   */
+  private double ratioToBytes(Long nodeCapacity, double utilizationRatio) {
+    return nodeCapacity * utilizationRatio;
+  }
+
+  /**
+   * Calculates the average datanode utilization for the specified nodes.
+   * Utilization is used space divided by capacity.
+   *
+   * @param nodes List of DatanodeUsageInfo to find the average utilization for
+   * @return Average utilization value
+   * @throws ArithmeticException Division by zero
+   */
+  private double calculateAvgUtilization(List<DatanodeUsageInfo> nodes)
+      throws ArithmeticException {
     SCMNodeStat aggregatedStats = new SCMNodeStat(
         0, 0, 0);
     for (DatanodeUsageInfo node : nodes) {
       aggregatedStats.add(node.getScmNodeStat());
     }
-    return aggregatedStats.getScmUsed().get().doubleValue() /
-        aggregatedStats.getCapacity().get().doubleValue();
+    clusterCapacity = aggregatedStats.getCapacity().get();
+    clusterUsed = aggregatedStats.getScmUsed().get();
+    clusterRemaining = aggregatedStats.getRemaining().get();
+
+    try {
+      return clusterUsed / (double) clusterCapacity;
+    } catch (ArithmeticException e) {

Review comment:
       NIT. better not handle `ArithmeticException`, instead check 
`nodes.size() != 0` at the entrance of function.

##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+
+@Metrics(name = "ContainerBalancer Metrics", about = "Metrics related to " +
+    "Container Balancer running in SCM", context = "SCM")
+public final class ContainerBalancerMetrics {
+
+  @Metric(about = "The total amount of used space in GigaBytes that needs to " 
+
+      "be balanced.")
+  private LongMetric totalSizeToBalanceGB;
+
+  @Metric(about = "The amount of Giga Bytes that have been moved to achieve " +
+      "balance.")
+  private LongMetric gigaBytesMoved;
+
+  @Metric(about = "Number of containers that Container Balancer has moved" +
+      " until now.")
+  private LongMetric numContainersMoved;
+
+  @Metric(about = "The total number of datanodes that need to be balanced.")
+  private LongMetric totalNumDatanodesToBalance;
+
+  @Metric(about = "Number of datanodes that Container Balancer has balanced " +
+      "until now.")
+  private LongMetric numDatanodesBalanced;
+
+  @Metric(about = "Utilisation value of the current maximum utilised 
datanode.")
+  private double maxUtilizedDatanodeRatio;

Review comment:
       NIT: `maxUtilizedDatanodeRatio` to `maxDatanodeUtilizedRatio`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to