GlenGeng commented on a change in pull request #2230:
URL: https://github.com/apache/ozone/pull/2230#discussion_r629971934
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
+ try {
+ // sorted list in order from most to least used
+ nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+ } catch (NullPointerException e) {
Review comment:
Why catching NPE here ?
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
+ try {
+ // sorted list in order from most to least used
+ nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+ } catch (NullPointerException e) {
+ LOG.error("Container Balancer could not retrieve nodes from Node " +
+ "Manager.", e);
+ stop();
+ return false;
+ }
+
+ try {
+ clusterAvgUtilisation = calculateAvgUtilization(nodes);
+ } catch(ArithmeticException e) {
+ LOG.warn("Container Balancer failed to initialize an iteration", e);
+ return false;
+ }
+ LOG.info("Average utilization of the cluster is {}",
clusterAvgUtilisation);
// under utilized nodes have utilization(that is, used / capacity) less
// than lower limit
- double lowerLimit = avgUtilisation - threshold;
+ double lowerLimit = clusterAvgUtilisation - threshold;
// over utilized nodes have utilization(that is, used / capacity) greater
// than upper limit
- double upperLimit = avgUtilisation + threshold;
+ double upperLimit = clusterAvgUtilisation + threshold;
+
Review comment:
NIT. merge 166 and 167 together.
what if `clusterAvgUtilisation` is less than `threshold`, e.g., for an empty
cluster. Does a negative `lowerLimit` make sense here ?
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
+ try {
+ // sorted list in order from most to least used
+ nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+ } catch (NullPointerException e) {
+ LOG.error("Container Balancer could not retrieve nodes from Node " +
+ "Manager.", e);
+ stop();
+ return false;
+ }
+
+ try {
+ clusterAvgUtilisation = calculateAvgUtilization(nodes);
+ } catch(ArithmeticException e) {
+ LOG.warn("Container Balancer failed to initialize an iteration", e);
+ return false;
+ }
+ LOG.info("Average utilization of the cluster is {}",
clusterAvgUtilisation);
// under utilized nodes have utilization(that is, used / capacity) less
// than lower limit
- double lowerLimit = avgUtilisation - threshold;
+ double lowerLimit = clusterAvgUtilisation - threshold;
// over utilized nodes have utilization(that is, used / capacity) greater
// than upper limit
- double upperLimit = avgUtilisation + threshold;
+ double upperLimit = clusterAvgUtilisation + threshold;
+
LOG.info("Lower limit for utilization is {}", lowerLimit);
LOG.info("Upper limit for utilization is {}", upperLimit);
- // find over utilised(source) and under utilised(target) nodes
- sourceNodes = new ArrayList<>();
- targetNodes = new ArrayList<>();
-// for (DatanodeUsageInfo node : nodes) {
-// SCMNodeStat stat = node.getScmNodeStat();
-// double utilization = stat.getScmUsed().get().doubleValue() /
-// stat.getCapacity().get().doubleValue();
-// if (utilization > upperLimit) {
-// sourceNodes.add(node);
-// } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-// targetNodes.add(node);
-// }
-// }
- }
-
- // calculate the average datanode utilisation across the cluster
- private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+ long numDatanodesToBalance = 0L;
+ double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+ // find over and under utilized nodes
+ for (DatanodeUsageInfo node : nodes) {
+ double utilization = calculateUtilization(node);
+ if (utilization > clusterAvgUtilisation) {
+ if (utilization > upperLimit) {
+ overUtilizedNodes.add(node);
+ numDatanodesToBalance += 1;
+
+ // amount of bytes greater than upper limit in this node
+ overLoadedBytes +=
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ utilization) -
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ upperLimit);
+ } else {
+ aboveAverageUtilizedNodes.add(node);
+ }
+ } else if (utilization < clusterAvgUtilisation) {
+ if (utilization < lowerLimit) {
+ underUtilizedNodes.add(node);
+ numDatanodesToBalance += 1;
+
+ // amount of bytes lesser than lower limit in this node
+ underLoadedBytes +=
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ lowerLimit) -
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ utilization);
+ } else {
+ belowAverageUtilizedNodes.add(node);
+ }
+ }
+ }
+
+ Collections.reverse(underUtilizedNodes);
+ Collections.reverse(belowAverageUtilizedNodes);
+
+ long numDatanodesBalanced = 0;
+ // count number of nodes that were balanced in previous iteration
+ for (DatanodeUsageInfo node : sourceNodes) {
+ if (!containsNode(overUtilizedNodes, node) &&
Review comment:
You can use `CollectionUtils.intersection()`,
`CollectionUtils.intersection()`, `CollectionUtils.union()` to simplify the
code.
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
+ try {
+ // sorted list in order from most to least used
+ nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+ } catch (NullPointerException e) {
+ LOG.error("Container Balancer could not retrieve nodes from Node " +
+ "Manager.", e);
+ stop();
+ return false;
+ }
+
+ try {
+ clusterAvgUtilisation = calculateAvgUtilization(nodes);
+ } catch(ArithmeticException e) {
+ LOG.warn("Container Balancer failed to initialize an iteration", e);
+ return false;
+ }
+ LOG.info("Average utilization of the cluster is {}",
clusterAvgUtilisation);
// under utilized nodes have utilization(that is, used / capacity) less
// than lower limit
- double lowerLimit = avgUtilisation - threshold;
+ double lowerLimit = clusterAvgUtilisation - threshold;
// over utilized nodes have utilization(that is, used / capacity) greater
// than upper limit
- double upperLimit = avgUtilisation + threshold;
+ double upperLimit = clusterAvgUtilisation + threshold;
+
LOG.info("Lower limit for utilization is {}", lowerLimit);
LOG.info("Upper limit for utilization is {}", upperLimit);
- // find over utilised(source) and under utilised(target) nodes
- sourceNodes = new ArrayList<>();
- targetNodes = new ArrayList<>();
-// for (DatanodeUsageInfo node : nodes) {
-// SCMNodeStat stat = node.getScmNodeStat();
-// double utilization = stat.getScmUsed().get().doubleValue() /
-// stat.getCapacity().get().doubleValue();
-// if (utilization > upperLimit) {
-// sourceNodes.add(node);
-// } else if (utilization < lowerLimit || utilization < avgUtilisation) {
-// targetNodes.add(node);
-// }
-// }
- }
-
- // calculate the average datanode utilisation across the cluster
- private double calculateAvgUtilisation(List<DatanodeUsageInfo> nodes) {
+ long numDatanodesToBalance = 0L;
+ double overLoadedBytes = 0D, underLoadedBytes = 0D;
+
+ // find over and under utilized nodes
+ for (DatanodeUsageInfo node : nodes) {
+ double utilization = calculateUtilization(node);
+ if (utilization > clusterAvgUtilisation) {
+ if (utilization > upperLimit) {
+ overUtilizedNodes.add(node);
+ numDatanodesToBalance += 1;
+
+ // amount of bytes greater than upper limit in this node
+ overLoadedBytes +=
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ utilization) -
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ upperLimit);
+ } else {
+ aboveAverageUtilizedNodes.add(node);
+ }
+ } else if (utilization < clusterAvgUtilisation) {
+ if (utilization < lowerLimit) {
+ underUtilizedNodes.add(node);
+ numDatanodesToBalance += 1;
+
+ // amount of bytes lesser than lower limit in this node
+ underLoadedBytes +=
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ lowerLimit) -
+ ratioToBytes(node.getScmNodeStat().getCapacity().get(),
+ utilization);
+ } else {
+ belowAverageUtilizedNodes.add(node);
+ }
+ }
+ }
+
+ Collections.reverse(underUtilizedNodes);
+ Collections.reverse(belowAverageUtilizedNodes);
+
+ long numDatanodesBalanced = 0;
+ // count number of nodes that were balanced in previous iteration
+ for (DatanodeUsageInfo node : sourceNodes) {
+ if (!containsNode(overUtilizedNodes, node) &&
+ !containsNode(underUtilizedNodes, node)) {
+ numDatanodesBalanced += 1;
+ }
+ }
+
+ // calculate total number of nodes that have been balanced
+ numDatanodesBalanced =
+ numDatanodesBalanced + metrics.getNumDatanodesBalanced().get();
+ metrics.setNumDatanodesBalanced(new LongMetric(numDatanodesBalanced));
+ sourceNodes = new ArrayList<>(
+ overUtilizedNodes.size() + underUtilizedNodes.size());
+
+ if (numDatanodesBalanced + numDatanodesToBalance > maxDatanodesToBalance) {
+ LOG.info("Approaching Max Datanodes To Balance limit in Container " +
+ "Balancer. Stopping Balancer.");
+ stop();
+ return false;
+ } else {
+ sourceNodes.addAll(overUtilizedNodes);
+ sourceNodes.addAll(underUtilizedNodes);
+
+ if (sourceNodes.isEmpty()) {
+ LOG.info("Did not find any unbalanced Datanodes.");
+ stop();
+ return false;
+ } else {
+ LOG.info("Container Balancer has identified Datanodes that need to be"
+
+ " balanced.");
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Performs binary search to determine if the specified listToSearch
+ * contains the specified node.
+ *
+ * @param listToSearch List of DatanodeUsageInfo to be searched.
+ * @param node DatanodeUsageInfo to be searched for.
+ * @return true if the specified node is present in listToSearch, otherwise
+ * false.
+ */
+ private boolean containsNode(
Review comment:
NIT. `containsNode` is called multi times, better change `listToSearch`
to hash set, and do the existence check, which may be simpler and quicker.
For example, declare `overUtilizedNodes` and `underUtilizedNodes` to be hash
set.
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
+ try {
+ // sorted list in order from most to least used
+ nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
+ } catch (NullPointerException e) {
+ LOG.error("Container Balancer could not retrieve nodes from Node " +
+ "Manager.", e);
+ stop();
+ return false;
+ }
+
+ try {
+ clusterAvgUtilisation = calculateAvgUtilization(nodes);
Review comment:
`ArithmeticException` means `nodes` is empty, which leading to divide 0.
How about skip this iteration if `nodes` is empty ? say
```
nodes = nodeManager.getMostOrLeastUsedDatanodes(true);
if (nodes.empty()) {
return true.
}
```
The balancer should not work if SCM haven't heard any datanodes.
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
Review comment:
NIT. merge the two info. In multi-thread context, there might be
intervening logs between 114 and 115.
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
+ LOG.info("Container Balancer is already running.");
+ throw new RuntimeException();
+ }
this.balancerRunning = true;
-
ozoneConfiguration = new OzoneConfiguration();
- // initialise configs
this.config = balancerConfiguration;
this.threshold = config.getThreshold();
- this.maxDatanodesToBalance =
- config.getMaxDatanodesToBalance();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
this.maxSizeToMove = config.getMaxSizeToMove();
+ this.clusterCapacity = 0L;
+ this.clusterUsed = 0L;
+ this.clusterRemaining = 0L;
+
+ this.overUtilizedNodes = new ArrayList<>();
+ this.underUtilizedNodes = new ArrayList<>();
+ this.aboveAverageUtilizedNodes = new ArrayList<>();
+ this.belowAverageUtilizedNodes = new ArrayList<>();
+ this.sourceNodes = new ArrayList<>();
+
LOG.info("Starting Container Balancer...");
+ LOG.info(toString());
+
+ balance();
+ }
- // sorted list in order from most to least used
- List<DatanodeUsageInfo> nodes = nodeManager.
- getMostOrLeastUsedDatanodes(true);
- double avgUtilisation = calculateAvgUtilisation(nodes);
+ /**
+ * Balances the cluster.
+ */
+ private void balance() {
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ aboveAverageUtilizedNodes.clear();
+ belowAverageUtilizedNodes.clear();
+ initializeIteration();
+ }
+
+ /**
+ * Initializes an iteration during balancing. Recognizes over, under,
+ * below-average,and under-average utilizes nodes. Decides whether
+ * balancing needs to continue or should be stopped.
+ *
+ * @return true if successfully initialized, otherwise false.
+ */
+ private boolean initializeIteration() {
+ List<DatanodeUsageInfo> nodes;
Review comment:
NIT. `nodeUsageInfos`. nodes is misleading here.
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
##########
@@ -59,74 +80,321 @@ public ContainerBalancer(
this.ozoneConfiguration = ozoneConfiguration;
this.balancerRunning = false;
this.config = new ContainerBalancerConfiguration();
+ this.metrics = new ContainerBalancerMetrics();
}
/**
- * Start ContainerBalancer. Current implementation is incomplete.
+ * Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
public void start(ContainerBalancerConfiguration balancerConfiguration) {
+ if (balancerRunning) {
Review comment:
NIT. Better use a lock free variable to avoid contention, and print a
error instead of throwing RuntimeException.
```
private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
public void start(ContainerBalancerConfiguration balancerConfiguration) {
if (!balancerRunning.compareAndSet(false, true)) {
LOG.error("Container Balancer is already running.");
return;
}
///
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]