siddhantsangwan commented on code in PR #3782: URL: https://github.com/apache/ozone/pull/3782#discussion_r994670075
########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java: ########## @@ -0,0 +1,1032 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; + +/** + * Container balancer is a service in SCM to move containers between over- and + * under-utilized datanodes. + */ +public class ContainerBalancerTask implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerTask.class); + + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private ContainerBalancer containerBalancer; + private final SCMContext scmContext; + private double threshold; + private int totalNodesInCluster; + private double maxDatanodesRatioToInvolvePerIteration; + private long maxSizeToMovePerIteration; + private int countDatanodesInvolvedPerIteration; + private long sizeScheduledForMoveInLatestIteration; + // count actual size moved in bytes + private long sizeActuallyMovedInLatestIteration; + private int iterations; + private List<DatanodeUsageInfo> unBalancedNodes; + private List<DatanodeUsageInfo> overUtilizedNodes; + private List<DatanodeUsageInfo> underUtilizedNodes; + private List<DatanodeUsageInfo> withinThresholdUtilizedNodes; + private Set<String> excludeNodes; + private Set<String> includeNodes; + private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterRemaining; + private double clusterAvgUtilisation; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private NetworkTopology networkTopology; + private double upperLimit; + private double lowerLimit; + private ContainerBalancerSelectionCriteria selectionCriteria; + private volatile Status taskStatus = Status.RUNNING; + + /* + Since a container can be selected only once during an iteration, these maps + use it as a primary key to track source to target pairings. + */ + private final Map<ContainerID, DatanodeDetails> containerToSourceMap; + private final Map<ContainerID, DatanodeDetails> containerToTargetMap; + + private Set<DatanodeDetails> selectedTargets; + private Set<DatanodeDetails> selectedSources; + private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; + private Map<ContainerMoveSelection, + CompletableFuture<LegacyReplicationManager.MoveResult>> + moveSelectionToFutureMap; + private IterationResult iterationResult; + private int nextIterationIndex; + + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. + * + * @param scm the storage container manager + */ + public ContainerBalancerTask(StorageContainerManager scm, + int nextIterationIndex, + ContainerBalancer containerBalancer, + ContainerBalancerMetrics metrics, + ContainerBalancerConfiguration config) { + this.nodeManager = scm.getScmNodeManager(); + this.containerManager = scm.getContainerManager(); + this.replicationManager = scm.getReplicationManager(); + this.ozoneConfiguration = scm.getConfiguration(); + this.containerBalancer = containerBalancer; + this.config = config; + this.metrics = metrics; + this.scmContext = scm.getScmContext(); + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy(); + this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = nextIterationIndex; + this.containerToSourceMap = new HashMap<>(); + this.containerToTargetMap = new HashMap<>(); + this.selectedSources = new HashSet<>(); + this.selectedTargets = new HashSet<>(); + findSourceStrategy = new FindSourceGreedy(nodeManager); + } + + /** + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. + */ + public void run() { + try { + balancer(); + } catch (Throwable e) { + LOG.error("Container Balancer is stopped abnormally, ", e); + } Review Comment: Why are we catching a Throwable here? ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java: ########## @@ -0,0 +1,1032 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; + +/** + * Container balancer is a service in SCM to move containers between over- and + * under-utilized datanodes. + */ +public class ContainerBalancerTask implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerTask.class); + + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private ContainerBalancer containerBalancer; + private final SCMContext scmContext; + private double threshold; + private int totalNodesInCluster; + private double maxDatanodesRatioToInvolvePerIteration; + private long maxSizeToMovePerIteration; + private int countDatanodesInvolvedPerIteration; + private long sizeScheduledForMoveInLatestIteration; + // count actual size moved in bytes + private long sizeActuallyMovedInLatestIteration; + private int iterations; + private List<DatanodeUsageInfo> unBalancedNodes; + private List<DatanodeUsageInfo> overUtilizedNodes; + private List<DatanodeUsageInfo> underUtilizedNodes; + private List<DatanodeUsageInfo> withinThresholdUtilizedNodes; + private Set<String> excludeNodes; + private Set<String> includeNodes; + private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterRemaining; + private double clusterAvgUtilisation; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private NetworkTopology networkTopology; + private double upperLimit; + private double lowerLimit; + private ContainerBalancerSelectionCriteria selectionCriteria; + private volatile Status taskStatus = Status.RUNNING; + + /* + Since a container can be selected only once during an iteration, these maps + use it as a primary key to track source to target pairings. + */ + private final Map<ContainerID, DatanodeDetails> containerToSourceMap; + private final Map<ContainerID, DatanodeDetails> containerToTargetMap; + + private Set<DatanodeDetails> selectedTargets; + private Set<DatanodeDetails> selectedSources; + private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; + private Map<ContainerMoveSelection, + CompletableFuture<LegacyReplicationManager.MoveResult>> + moveSelectionToFutureMap; + private IterationResult iterationResult; + private int nextIterationIndex; + + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. + * + * @param scm the storage container manager + */ + public ContainerBalancerTask(StorageContainerManager scm, + int nextIterationIndex, + ContainerBalancer containerBalancer, + ContainerBalancerMetrics metrics, + ContainerBalancerConfiguration config) { + this.nodeManager = scm.getScmNodeManager(); + this.containerManager = scm.getContainerManager(); + this.replicationManager = scm.getReplicationManager(); + this.ozoneConfiguration = scm.getConfiguration(); + this.containerBalancer = containerBalancer; + this.config = config; + this.metrics = metrics; + this.scmContext = scm.getScmContext(); + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy(); + this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = nextIterationIndex; + this.containerToSourceMap = new HashMap<>(); + this.containerToTargetMap = new HashMap<>(); + this.selectedSources = new HashSet<>(); + this.selectedTargets = new HashSet<>(); + findSourceStrategy = new FindSourceGreedy(nodeManager); + } + + /** + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. + */ Review Comment: This doc seems wrong. I think it better describes the `balancer` method. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java: ########## @@ -0,0 +1,1032 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; + +/** + * Container balancer is a service in SCM to move containers between over- and + * under-utilized datanodes. + */ +public class ContainerBalancerTask implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerTask.class); + + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private ContainerBalancer containerBalancer; + private final SCMContext scmContext; + private double threshold; + private int totalNodesInCluster; + private double maxDatanodesRatioToInvolvePerIteration; + private long maxSizeToMovePerIteration; + private int countDatanodesInvolvedPerIteration; + private long sizeScheduledForMoveInLatestIteration; + // count actual size moved in bytes + private long sizeActuallyMovedInLatestIteration; + private int iterations; + private List<DatanodeUsageInfo> unBalancedNodes; + private List<DatanodeUsageInfo> overUtilizedNodes; + private List<DatanodeUsageInfo> underUtilizedNodes; + private List<DatanodeUsageInfo> withinThresholdUtilizedNodes; + private Set<String> excludeNodes; + private Set<String> includeNodes; + private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterRemaining; + private double clusterAvgUtilisation; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private NetworkTopology networkTopology; + private double upperLimit; + private double lowerLimit; + private ContainerBalancerSelectionCriteria selectionCriteria; + private volatile Status taskStatus = Status.RUNNING; + + /* + Since a container can be selected only once during an iteration, these maps + use it as a primary key to track source to target pairings. + */ + private final Map<ContainerID, DatanodeDetails> containerToSourceMap; + private final Map<ContainerID, DatanodeDetails> containerToTargetMap; + + private Set<DatanodeDetails> selectedTargets; + private Set<DatanodeDetails> selectedSources; + private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; + private Map<ContainerMoveSelection, + CompletableFuture<LegacyReplicationManager.MoveResult>> + moveSelectionToFutureMap; + private IterationResult iterationResult; + private int nextIterationIndex; + + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. + * + * @param scm the storage container manager + */ + public ContainerBalancerTask(StorageContainerManager scm, + int nextIterationIndex, + ContainerBalancer containerBalancer, + ContainerBalancerMetrics metrics, + ContainerBalancerConfiguration config) { + this.nodeManager = scm.getScmNodeManager(); + this.containerManager = scm.getContainerManager(); + this.replicationManager = scm.getReplicationManager(); + this.ozoneConfiguration = scm.getConfiguration(); + this.containerBalancer = containerBalancer; + this.config = config; + this.metrics = metrics; + this.scmContext = scm.getScmContext(); + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy(); + this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = nextIterationIndex; + this.containerToSourceMap = new HashMap<>(); + this.containerToTargetMap = new HashMap<>(); + this.selectedSources = new HashSet<>(); + this.selectedTargets = new HashSet<>(); + findSourceStrategy = new FindSourceGreedy(nodeManager); + } + + /** + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. + */ + public void run() { + try { + balancer(); + } catch (Throwable e) { + LOG.error("Container Balancer is stopped abnormally, ", e); + } + synchronized (this) { + taskStatus = Status.STOPPED; + } + } + + /** + * Tries to stop ContainerBalancer changing status to stopping. Calls + * {@link ContainerBalancerTask#stop()}. + */ + public void stop() { + synchronized (this) { + if (taskStatus == Status.RUNNING) { + taskStatus = Status.STOPPING; + } + } + } + + private void balancer() { Review Comment: Let's change this from `balancer()` to `balance()`. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java: ########## @@ -0,0 +1,1032 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; + +/** + * Container balancer is a service in SCM to move containers between over- and + * under-utilized datanodes. + */ +public class ContainerBalancerTask implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerTask.class); + + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private ContainerBalancer containerBalancer; + private final SCMContext scmContext; + private double threshold; + private int totalNodesInCluster; + private double maxDatanodesRatioToInvolvePerIteration; + private long maxSizeToMovePerIteration; + private int countDatanodesInvolvedPerIteration; + private long sizeScheduledForMoveInLatestIteration; + // count actual size moved in bytes + private long sizeActuallyMovedInLatestIteration; + private int iterations; + private List<DatanodeUsageInfo> unBalancedNodes; + private List<DatanodeUsageInfo> overUtilizedNodes; + private List<DatanodeUsageInfo> underUtilizedNodes; + private List<DatanodeUsageInfo> withinThresholdUtilizedNodes; + private Set<String> excludeNodes; + private Set<String> includeNodes; + private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterRemaining; + private double clusterAvgUtilisation; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private NetworkTopology networkTopology; + private double upperLimit; + private double lowerLimit; + private ContainerBalancerSelectionCriteria selectionCriteria; + private volatile Status taskStatus = Status.RUNNING; + + /* + Since a container can be selected only once during an iteration, these maps + use it as a primary key to track source to target pairings. + */ + private final Map<ContainerID, DatanodeDetails> containerToSourceMap; + private final Map<ContainerID, DatanodeDetails> containerToTargetMap; + + private Set<DatanodeDetails> selectedTargets; + private Set<DatanodeDetails> selectedSources; + private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; + private Map<ContainerMoveSelection, + CompletableFuture<LegacyReplicationManager.MoveResult>> + moveSelectionToFutureMap; + private IterationResult iterationResult; + private int nextIterationIndex; + + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. + * + * @param scm the storage container manager + */ + public ContainerBalancerTask(StorageContainerManager scm, + int nextIterationIndex, + ContainerBalancer containerBalancer, + ContainerBalancerMetrics metrics, + ContainerBalancerConfiguration config) { + this.nodeManager = scm.getScmNodeManager(); + this.containerManager = scm.getContainerManager(); + this.replicationManager = scm.getReplicationManager(); + this.ozoneConfiguration = scm.getConfiguration(); + this.containerBalancer = containerBalancer; + this.config = config; + this.metrics = metrics; + this.scmContext = scm.getScmContext(); + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy(); + this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = nextIterationIndex; + this.containerToSourceMap = new HashMap<>(); + this.containerToTargetMap = new HashMap<>(); + this.selectedSources = new HashSet<>(); + this.selectedTargets = new HashSet<>(); + findSourceStrategy = new FindSourceGreedy(nodeManager); + } + + /** + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. + */ + public void run() { + try { + balancer(); + } catch (Throwable e) { + LOG.error("Container Balancer is stopped abnormally, ", e); + } + synchronized (this) { + taskStatus = Status.STOPPED; + } + } + + /** + * Tries to stop ContainerBalancer changing status to stopping. Calls + * {@link ContainerBalancerTask#stop()}. Review Comment: ```suggestion * Changes Status from RUNNING to STOPPING. ``` ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java: ########## @@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException, } @Test - public void testCalculationOfUtilization() { - Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); - for (int i = 0; i < nodesInCluster.size(); i++) { - Assertions.assertEquals(nodeUtilizations.get(i), - nodesInCluster.get(i).calculateUtilization(), 0.0001); - } - + public void testShouldRun() throws Exception { + boolean doRun = containerBalancer.shouldRun(); // should be equal to average utilization of the cluster - Assertions.assertEquals(averageUtilization, - containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001); - } - - /** - * Checks whether ContainerBalancer is correctly updating the list of - * unBalanced nodes with varying values of Threshold. - */ - @Test - public void - initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - List<DatanodeUsageInfo> expectedUnBalancedNodes; - List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer; - - // check for random threshold values - for (int i = 0; i < 50; i++) { - double randomThreshold = RANDOM.nextDouble() * 100; - - balancerConfiguration.setThreshold(randomThreshold); - startBalancer(balancerConfiguration); - - // waiting for balance completed. - // TODO: this is a temporary implementation for now - // modify this after balancer is fully completed - try { - Thread.sleep(100); - } catch (InterruptedException e) { } - - expectedUnBalancedNodes = - determineExpectedUnBalancedNodes(randomThreshold); - unBalancedNodesAccordingToBalancer = - containerBalancer.getUnBalancedNodes(); - - stopBalancer(); - Assertions.assertEquals( - expectedUnBalancedNodes.size(), - unBalancedNodesAccordingToBalancer.size()); - - for (int j = 0; j < expectedUnBalancedNodes.size(); j++) { - Assertions.assertEquals( - expectedUnBalancedNodes.get(j).getDatanodeDetails(), - unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails()); - } - } + Assertions.assertEquals(doRun, false); + containerBalancer.saveConfiguration(balancerConfiguration, true, 0); + doRun = containerBalancer.shouldRun(); + Assertions.assertEquals(doRun, true); Review Comment: ```suggestion Assertions.assertTrue(doRun); ``` ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java: ########## @@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException, } @Test - public void testCalculationOfUtilization() { - Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); - for (int i = 0; i < nodesInCluster.size(); i++) { - Assertions.assertEquals(nodeUtilizations.get(i), - nodesInCluster.get(i).calculateUtilization(), 0.0001); - } - + public void testShouldRun() throws Exception { + boolean doRun = containerBalancer.shouldRun(); // should be equal to average utilization of the cluster - Assertions.assertEquals(averageUtilization, - containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001); - } - - /** - * Checks whether ContainerBalancer is correctly updating the list of - * unBalanced nodes with varying values of Threshold. - */ - @Test - public void - initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - List<DatanodeUsageInfo> expectedUnBalancedNodes; - List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer; - - // check for random threshold values - for (int i = 0; i < 50; i++) { - double randomThreshold = RANDOM.nextDouble() * 100; - - balancerConfiguration.setThreshold(randomThreshold); - startBalancer(balancerConfiguration); - - // waiting for balance completed. - // TODO: this is a temporary implementation for now - // modify this after balancer is fully completed - try { - Thread.sleep(100); - } catch (InterruptedException e) { } - - expectedUnBalancedNodes = - determineExpectedUnBalancedNodes(randomThreshold); - unBalancedNodesAccordingToBalancer = - containerBalancer.getUnBalancedNodes(); - - stopBalancer(); - Assertions.assertEquals( - expectedUnBalancedNodes.size(), - unBalancedNodesAccordingToBalancer.size()); - - for (int j = 0; j < expectedUnBalancedNodes.size(); j++) { - Assertions.assertEquals( - expectedUnBalancedNodes.get(j).getDatanodeDetails(), - unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails()); - } - } + Assertions.assertEquals(doRun, false); Review Comment: ```suggestion Assertions.assertFalse(doRun); ``` ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java: ########## @@ -0,0 +1,1032 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; + +/** + * Container balancer is a service in SCM to move containers between over- and + * under-utilized datanodes. + */ +public class ContainerBalancerTask implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(ContainerBalancerTask.class); + + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private OzoneConfiguration ozoneConfiguration; + private ContainerBalancer containerBalancer; + private final SCMContext scmContext; + private double threshold; + private int totalNodesInCluster; + private double maxDatanodesRatioToInvolvePerIteration; + private long maxSizeToMovePerIteration; + private int countDatanodesInvolvedPerIteration; + private long sizeScheduledForMoveInLatestIteration; + // count actual size moved in bytes + private long sizeActuallyMovedInLatestIteration; + private int iterations; + private List<DatanodeUsageInfo> unBalancedNodes; + private List<DatanodeUsageInfo> overUtilizedNodes; + private List<DatanodeUsageInfo> underUtilizedNodes; + private List<DatanodeUsageInfo> withinThresholdUtilizedNodes; + private Set<String> excludeNodes; + private Set<String> includeNodes; + private ContainerBalancerConfiguration config; + private ContainerBalancerMetrics metrics; + private long clusterCapacity; + private long clusterRemaining; + private double clusterAvgUtilisation; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private NetworkTopology networkTopology; + private double upperLimit; + private double lowerLimit; + private ContainerBalancerSelectionCriteria selectionCriteria; + private volatile Status taskStatus = Status.RUNNING; + + /* + Since a container can be selected only once during an iteration, these maps + use it as a primary key to track source to target pairings. + */ + private final Map<ContainerID, DatanodeDetails> containerToSourceMap; + private final Map<ContainerID, DatanodeDetails> containerToTargetMap; + + private Set<DatanodeDetails> selectedTargets; + private Set<DatanodeDetails> selectedSources; + private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; + private Map<ContainerMoveSelection, + CompletableFuture<LegacyReplicationManager.MoveResult>> + moveSelectionToFutureMap; + private IterationResult iterationResult; + private int nextIterationIndex; + + /** + * Constructs ContainerBalancer with the specified arguments. Initializes + * ContainerBalancerMetrics. Container Balancer does not start on + * construction. + * + * @param scm the storage container manager + */ + public ContainerBalancerTask(StorageContainerManager scm, + int nextIterationIndex, + ContainerBalancer containerBalancer, + ContainerBalancerMetrics metrics, + ContainerBalancerConfiguration config) { + this.nodeManager = scm.getScmNodeManager(); + this.containerManager = scm.getContainerManager(); + this.replicationManager = scm.getReplicationManager(); + this.ozoneConfiguration = scm.getConfiguration(); + this.containerBalancer = containerBalancer; + this.config = config; + this.metrics = metrics; + this.scmContext = scm.getScmContext(); + this.overUtilizedNodes = new ArrayList<>(); + this.underUtilizedNodes = new ArrayList<>(); + this.withinThresholdUtilizedNodes = new ArrayList<>(); + this.unBalancedNodes = new ArrayList<>(); + this.placementPolicyValidateProxy = scm.getPlacementPolicyValidateProxy(); + this.networkTopology = scm.getClusterMap(); + this.nextIterationIndex = nextIterationIndex; + this.containerToSourceMap = new HashMap<>(); + this.containerToTargetMap = new HashMap<>(); + this.selectedSources = new HashSet<>(); + this.selectedTargets = new HashSet<>(); + findSourceStrategy = new FindSourceGreedy(nodeManager); + } + + /** + * Balances the cluster in iterations. Regularly checks if balancing has + * been stopped. + */ + public void run() { + try { + balancer(); + } catch (Throwable e) { + LOG.error("Container Balancer is stopped abnormally, ", e); + } + synchronized (this) { + taskStatus = Status.STOPPED; + } + } + + /** + * Tries to stop ContainerBalancer changing status to stopping. Calls + * {@link ContainerBalancerTask#stop()}. + */ + public void stop() { + synchronized (this) { + if (taskStatus == Status.RUNNING) { + taskStatus = Status.STOPPING; + } + } + } + + private void balancer() { + this.iterations = config.getIterations(); + if (this.iterations == -1) { + //run balancer infinitely + this.iterations = Integer.MAX_VALUE; + } + + // nextIterationIndex is the iteration that balancer should start from on + // leader change or restart + int i = nextIterationIndex; + for (; i < iterations && isBalancerRunning(); i++) { + // reset some variables and metrics for this iteration + resetState(); + if (config.getTriggerDuEnable()) { + // before starting a new iteration, we trigger all the datanode + // to run `du`. this is an aggressive action, with which we can + // get more precise usage info of all datanodes before moving. + // this is helpful for container balancer to make more appropriate + // decisions. this will increase the disk io load of data nodes, so + // please enable it with caution. + nodeManager.refreshAllHealthyDnUsageInfo(); + try { + long nodeReportInterval = + ozoneConfiguration.getTimeDuration(HDDS_NODE_REPORT_INTERVAL, + HDDS_NODE_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + // one for sending command , one for running du, and one for + // reporting back make it like this for now, a more suitable + // value. can be set in the future if needed + wait(3 * nodeReportInterval); + } catch (InterruptedException e) { + LOG.info("Container Balancer was interrupted while waiting for" + + "datanodes refreshing volume usage info"); + Thread.currentThread().interrupt(); + return; + } + } + + if (!isBalancerRunning()) { + return; + } + + // initialize this iteration. stop balancing on initialization failure + if (!initializeIteration()) { + // just return if the reason for initialization failure is that + // balancer has been stopped in another thread + if (!isBalancerRunning()) { + return; + } + // otherwise, try to stop balancer + tryStopWithSaveConfiguration("Could not initialize " + + "ContainerBalancer's iteration number " + i); + return; + } + + IterationResult iR = doIteration(); + metrics.incrementNumIterations(1); + LOG.info("Result of this iteration of Container Balancer: {}", iR); + + // if no new move option is generated, it means the cluster cannot be + // balanced anymore; so just stop balancer + if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) { + tryStopWithSaveConfiguration(iR.toString()); + return; + } + + // persist next iteration index + if (iR == IterationResult.ITERATION_COMPLETED) { + try { + saveConfiguration(config, true, i + 1); + } catch (IOException | TimeoutException e) { + LOG.warn("Could not persist next iteration index value for " + + "ContainerBalancer after completing an iteration", e); + } + } + + // return if balancing has been stopped + if (!isBalancerRunning()) { + return; + } + + // wait for configured time before starting next iteration, unless + // this was the final iteration + if (i != iterations - 1) { + try { + wait(config.getBalancingInterval().toMillis()); + } catch (InterruptedException e) { + LOG.info("Container Balancer was interrupted while waiting for" + + " next iteration."); + Thread.currentThread().interrupt(); + return; + } + } + } + + tryStopWithSaveConfiguration("Completed all iterations."); + } + + /** + * Tries to stop ContainerBalancer. Logs the reason for stopping. Calls + * {@link ContainerBalancer#stopBalancer()}. + * @param stopReason a string specifying the reason for stopping + * ContainerBalancer. + */ + private void tryStopWithSaveConfiguration(String stopReason) { + synchronized (this) { + try { + LOG.info("Save Configuration for stopping. Reason: {}", stopReason); + saveConfiguration(config, false, 0); + stop(); + } catch (IOException | TimeoutException e) { + LOG.warn("Save configuration failed. Reason for " + + "stopping: {}", stopReason, e); + } + } + } + + private void saveConfiguration(ContainerBalancerConfiguration configuration, + boolean shouldRun, int index) + throws IOException, TimeoutException { + if (!isValidSCMState()) { + LOG.warn("Save configuration is not allowed as not in valid State."); + return; + } + synchronized (this) { + if (isBalancerRunning()) { + containerBalancer.saveConfiguration(configuration, shouldRun, index); + } + } + } + + /** + * Initializes an iteration during balancing. Recognizes over, under, and + * within threshold utilized nodes. Decides whether balancing needs to + * continue or should be stopped. + * + * @return true if successfully initialized, otherwise false. + */ + private boolean initializeIteration() { + if (!isValidSCMState()) { + return false; + } + // sorted list in order from most to least used + List<DatanodeUsageInfo> datanodeUsageInfos = + nodeManager.getMostOrLeastUsedDatanodes(true); + if (datanodeUsageInfos.isEmpty()) { + LOG.warn("Received an empty list of datanodes from Node Manager when " + + "trying to identify which nodes to balance"); + return false; + } + + this.threshold = config.getThresholdAsRatio(); + this.maxDatanodesRatioToInvolvePerIteration = + config.getMaxDatanodesRatioToInvolvePerIteration(); + this.maxSizeToMovePerIteration = config.getMaxSizeToMovePerIteration(); + if (config.getNetworkTopologyEnable()) { + findTargetStrategy = new FindTargetGreedyByNetworkTopology( + containerManager, placementPolicyValidateProxy, + nodeManager, networkTopology); + } else { + findTargetStrategy = new FindTargetGreedyByUsageInfo(containerManager, + placementPolicyValidateProxy, nodeManager); + } + this.excludeNodes = config.getExcludeNodes(); + this.includeNodes = config.getIncludeNodes(); + // include/exclude nodes from balancing according to configs + datanodeUsageInfos.removeIf(datanodeUsageInfo -> shouldExcludeDatanode( + datanodeUsageInfo.getDatanodeDetails())); + + this.totalNodesInCluster = datanodeUsageInfos.size(); + + clusterAvgUtilisation = calculateAvgUtilization(datanodeUsageInfos); + if (LOG.isDebugEnabled()) { + LOG.debug("Average utilization of the cluster is {}", + clusterAvgUtilisation); + } + + // over utilized nodes have utilization(that is, used / capacity) greater + // than upper limit + this.upperLimit = clusterAvgUtilisation + threshold; + // under utilized nodes have utilization(that is, used / capacity) less + // than lower limit + this.lowerLimit = clusterAvgUtilisation - threshold; + + if (LOG.isDebugEnabled()) { + LOG.debug("Lower limit for utilization is {} and Upper limit for " + + "utilization is {}", lowerLimit, upperLimit); + } + + long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L; + // find over and under utilized nodes + for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) { + if (!isBalancerRunning()) { + return false; + } + double utilization = datanodeUsageInfo.calculateUtilization(); + if (LOG.isDebugEnabled()) { + LOG.debug("Utilization for node {} with capacity {}B, used {}B, and " + + "remaining {}B is {}", + datanodeUsageInfo.getDatanodeDetails().getUuidString(), + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + datanodeUsageInfo.getScmNodeStat().getScmUsed().get(), + datanodeUsageInfo.getScmNodeStat().getRemaining().get(), + utilization); + } + if (Double.compare(utilization, upperLimit) > 0) { + overUtilizedNodes.add(datanodeUsageInfo); + metrics.incrementNumDatanodesUnbalanced(1); + + // amount of bytes greater than upper limit in this node + long overUtilizedBytes = ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + utilization) - ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + upperLimit); + totalOverUtilizedBytes += overUtilizedBytes; + } else if (Double.compare(utilization, lowerLimit) < 0) { + underUtilizedNodes.add(datanodeUsageInfo); + metrics.incrementNumDatanodesUnbalanced(1); + + // amount of bytes lesser than lower limit in this node + long underUtilizedBytes = ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + lowerLimit) - ratioToBytes( + datanodeUsageInfo.getScmNodeStat().getCapacity().get(), + utilization); + totalUnderUtilizedBytes += underUtilizedBytes; + } else { + withinThresholdUtilizedNodes.add(datanodeUsageInfo); + } + } + metrics.incrementDataSizeUnbalancedGB( + Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) / + OzoneConsts.GB); + Collections.reverse(underUtilizedNodes); + + unBalancedNodes = new ArrayList<>( + overUtilizedNodes.size() + underUtilizedNodes.size()); + unBalancedNodes.addAll(overUtilizedNodes); + unBalancedNodes.addAll(underUtilizedNodes); + + if (unBalancedNodes.isEmpty()) { + LOG.info("Did not find any unbalanced Datanodes."); + return false; + } + + LOG.info("Container Balancer has identified {} Over-Utilized and {} " + + "Under-Utilized Datanodes that need to be balanced.", + overUtilizedNodes.size(), underUtilizedNodes.size()); + + if (LOG.isDebugEnabled()) { + overUtilizedNodes.forEach(entry -> { + LOG.debug("Datanode {} {} is Over-Utilized.", + entry.getDatanodeDetails().getHostName(), + entry.getDatanodeDetails().getUuid()); + }); + + underUtilizedNodes.forEach(entry -> { + LOG.debug("Datanode {} {} is Under-Utilized.", + entry.getDatanodeDetails().getHostName(), + entry.getDatanodeDetails().getUuid()); + }); + } + + selectionCriteria = new ContainerBalancerSelectionCriteria(config, + nodeManager, replicationManager, containerManager, findSourceStrategy); + return true; + } + + private boolean isValidSCMState() { + if (scmContext.isInSafeMode()) { + LOG.error("Container Balancer cannot operate while SCM is in Safe Mode."); + return false; + } + if (!scmContext.isLeaderReady()) { + LOG.warn("Current SCM is not the leader."); + return false; + } + return true; + } + + private IterationResult doIteration() { + // note that potential and selected targets are updated in the following + // loop + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit); + List<DatanodeUsageInfo> potentialTargets = getPotentialTargets(); + findTargetStrategy.reInitialize(potentialTargets, config, upperLimit); + + moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size()); + boolean isMoveGeneratedInThisIteration = false; + iterationResult = IterationResult.ITERATION_COMPLETED; + + // match each source node with a target + while (true) { + if (!isBalancerRunning()) { + iterationResult = IterationResult.ITERATION_INTERRUPTED; + break; + } + + if (checkIterationLimits()) { + /* scheduled enough moves to hit either maxSizeToMovePerIteration or + maxDatanodesPercentageToInvolvePerIteration limit + */ + break; + } + + DatanodeDetails source = + findSourceStrategy.getNextCandidateSourceDataNode(); + if (source == null) { + // no more source DNs are present + break; + } + + ContainerMoveSelection moveSelection = matchSourceWithTarget(source); + if (moveSelection != null) { + if (processMoveSelection(source, moveSelection)) { + isMoveGeneratedInThisIteration = true; + } + } else { + // can not find any target for this source + findSourceStrategy.removeCandidateSourceDataNode(source); + } + } + + checkIterationResults(isMoveGeneratedInThisIteration); + return iterationResult; + } + + private boolean processMoveSelection(DatanodeDetails source, + ContainerMoveSelection moveSelection) { + ContainerID containerID = moveSelection.getContainerID(); + if (containerToSourceMap.containsKey(containerID) || + containerToTargetMap.containsKey(containerID)) { + LOG.warn("Container {} has already been selected for move from source " + + "{} to target {} earlier. Not moving this container again.", + containerID, + containerToSourceMap.get(containerID), + containerToTargetMap.get(containerID)); + return false; + } + + ContainerInfo containerInfo; + try { + containerInfo = + containerManager.getContainer(containerID); + } catch (ContainerNotFoundException e) { + LOG.warn("Could not get container {} from Container Manager before " + + "starting a container move", containerID, e); + return false; + } + LOG.info("ContainerBalancer is trying to move container {} with size " + + "{}B from source datanode {} to target datanode {}", + containerID.toString(), + containerInfo.getUsedBytes(), + source.getUuidString(), + moveSelection.getTargetNode().getUuidString()); + + if (moveContainer(source, moveSelection)) { + // consider move successful for now, and update selection criteria + updateTargetsAndSelectionCriteria(moveSelection, source); + } + return true; + } + + /** + * Check the iteration results. Result can be: + * <p>ITERATION_INTERRUPTED if balancing was stopped</p> + * <p>CAN_NOT_BALANCE_ANY_MORE if no move was generated during this iteration + * </p> + * <p>ITERATION_COMPLETED</p> + * @param isMoveGeneratedInThisIteration whether a move was generated during + * the iteration + */ + private void checkIterationResults(boolean isMoveGeneratedInThisIteration) { + if (!isMoveGeneratedInThisIteration) { + /* + If no move was generated during this iteration then we don't need to + check the move results + */ + iterationResult = IterationResult.CAN_NOT_BALANCE_ANY_MORE; + } else { + checkIterationMoveResults(); + } + } + + /** + * Checks the results of all move operations when exiting an iteration. + */ + private void checkIterationMoveResults() { + this.countDatanodesInvolvedPerIteration = 0; + CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf( + moveSelectionToFutureMap.values() + .toArray(new CompletableFuture[moveSelectionToFutureMap.size()])); + try { + allFuturesResult.get(config.getMoveTimeout().toMillis(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("Container balancer is interrupted"); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + long timeoutCounts = cancelAndCountPendingMoves(); + LOG.warn("{} Container moves are canceled.", timeoutCounts); + metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts); + } catch (ExecutionException e) { + LOG.error("Got exception while checkIterationMoveResults", e); + } + + countDatanodesInvolvedPerIteration = + selectedSources.size() + selectedTargets.size(); + metrics.incrementNumDatanodesInvolvedInLatestIteration( + countDatanodesInvolvedPerIteration); + metrics.incrementNumContainerMovesCompleted( + metrics.getNumContainerMovesCompletedInLatestIteration()); + metrics.incrementNumContainerMovesTimeout( + metrics.getNumContainerMovesTimeoutInLatestIteration()); + metrics.incrementDataSizeMovedGBInLatestIteration( + sizeActuallyMovedInLatestIteration / OzoneConsts.GB); + metrics.incrementDataSizeMovedGB( + metrics.getDataSizeMovedGBInLatestIteration()); + metrics.incrementNumContainerMovesFailed( + metrics.getNumContainerMovesFailedInLatestIteration()); + LOG.info("Iteration Summary. Number of Datanodes involved: {}. Size " + + "moved: {} ({} Bytes). Number of Container moves completed: {}.", + countDatanodesInvolvedPerIteration, + StringUtils.byteDesc(sizeActuallyMovedInLatestIteration), + sizeActuallyMovedInLatestIteration, + metrics.getNumContainerMovesCompletedInLatestIteration()); + } + + private long cancelAndCountPendingMoves() { + return moveSelectionToFutureMap.entrySet().stream() + .filter(entry -> !entry.getValue().isDone()) + .peek(entry -> { + LOG.warn("Container move timeout for container {} from source {}" + + " to target {}.", + entry.getKey().getContainerID(), + containerToSourceMap.get(entry.getKey().getContainerID()) + .getUuidString(), + entry.getKey().getTargetNode().getUuidString()); + entry.getValue().cancel(true); + }).count(); + } + + /** + * Match a source datanode with a target datanode and identify the container + * to move. + * + * @return ContainerMoveSelection containing the selected target and container + */ + private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) { + NavigableSet<ContainerID> candidateContainers = + selectionCriteria.getCandidateContainers(source); + + if (candidateContainers.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("ContainerBalancer could not find any candidate containers " + + "for datanode {}", source.getUuidString()); + } + return null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("ContainerBalancer is finding suitable target for source " + + "datanode {}", source.getUuidString()); + } + ContainerMoveSelection moveSelection = + findTargetStrategy.findTargetForContainerMove( + source, candidateContainers); + + if (moveSelection == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("ContainerBalancer could not find a suitable target for " + + "source node {}.", source.getUuidString()); + } + return null; + } + LOG.info("ContainerBalancer matched source datanode {} with target " + + "datanode {} for container move.", source.getUuidString(), + moveSelection.getTargetNode().getUuidString()); + + return moveSelection; + } + + /** + * Checks if limits maxDatanodesPercentageToInvolvePerIteration and + * maxSizeToMovePerIteration have been hit. + * + * @return true if a limit was hit, else false + */ + private boolean checkIterationLimits() { + int maxDatanodesToInvolve = + (int) (maxDatanodesRatioToInvolvePerIteration * totalNodesInCluster); + if (countDatanodesInvolvedPerIteration + 2 > maxDatanodesToInvolve) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hit max datanodes to involve limit. {} datanodes have" + + " already been scheduled for balancing and the limit is {}.", + countDatanodesInvolvedPerIteration, maxDatanodesToInvolve); + } + return true; + } + if (sizeScheduledForMoveInLatestIteration + + (long) ozoneConfiguration.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, + StorageUnit.BYTES) > maxSizeToMovePerIteration) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hit max size to move limit. {} bytes have already been " + + "scheduled for balancing and the limit is {} bytes.", + sizeScheduledForMoveInLatestIteration, + maxSizeToMovePerIteration); + } + return true; + } + return false; + } + + /** + * Asks {@link ReplicationManager} to move the specified container from + * source to target. + * + * @param source the source datanode + * @param moveSelection the selected container to move and target datanode + * @return false if an exception occurred or the move completed with a + * result other than ReplicationManager.MoveResult.COMPLETED. Returns true + * if the move completed with MoveResult.COMPLETED or move is not yet done + */ + private boolean moveContainer(DatanodeDetails source, + ContainerMoveSelection moveSelection) { + ContainerID containerID = moveSelection.getContainerID(); + CompletableFuture<LegacyReplicationManager.MoveResult> future; + try { + ContainerInfo containerInfo = containerManager.getContainer(containerID); + future = replicationManager + .move(containerID, source, moveSelection.getTargetNode()) + .whenComplete((result, ex) -> { + + metrics.incrementCurrentIterationContainerMoveMetric(result, 1); + if (ex != null) { + LOG.info("Container move for container {} from source {} to " + + "target {} failed with exceptions {}", + containerID.toString(), + source.getUuidString(), + moveSelection.getTargetNode().getUuidString(), ex); + metrics.incrementNumContainerMovesFailedInLatestIteration(1); + } else { + if (result == LegacyReplicationManager.MoveResult.COMPLETED) { + sizeActuallyMovedInLatestIteration += + containerInfo.getUsedBytes(); + if (LOG.isDebugEnabled()) { + LOG.debug("Container move completed for container {} from " + + "source {} to target {}", containerID, + source.getUuidString(), + moveSelection.getTargetNode().getUuidString()); + } + } else { + LOG.warn( + "Container move for container {} from source {} to target" + + " {} failed: {}", + moveSelection.getContainerID(), source.getUuidString(), + moveSelection.getTargetNode().getUuidString(), result); + } + } + }); + } catch (ContainerNotFoundException e) { + LOG.warn("Could not find Container {} for container move", + containerID, e); + metrics.incrementNumContainerMovesFailedInLatestIteration(1); + return false; + } catch (NodeNotFoundException | TimeoutException e) { + LOG.warn("Container move failed for container {}", containerID, e); + metrics.incrementNumContainerMovesFailedInLatestIteration(1); + return false; + } + + if (future.isDone()) { + if (future.isCompletedExceptionally()) { + return false; + } else { + LegacyReplicationManager.MoveResult result = future.join(); + moveSelectionToFutureMap.put(moveSelection, future); + return result == LegacyReplicationManager.MoveResult.COMPLETED; + } + } else { + moveSelectionToFutureMap.put(moveSelection, future); + return true; + } + } + + /** + * Update targets, sources, and selection criteria after a move. + * + * @param moveSelection latest selected target datanode and container + * @param source the source datanode + */ + private void updateTargetsAndSelectionCriteria( + ContainerMoveSelection moveSelection, DatanodeDetails source) { + ContainerID containerID = moveSelection.getContainerID(); + DatanodeDetails target = moveSelection.getTargetNode(); + + // count source if it has not been involved in move earlier + if (!selectedSources.contains(source)) { + countDatanodesInvolvedPerIteration += 1; + } + // count target if it has not been involved in move earlier + if (!selectedTargets.contains(target)) { + countDatanodesInvolvedPerIteration += 1; + } + + incSizeSelectedForMoving(source, moveSelection); + containerToSourceMap.put(containerID, source); + containerToTargetMap.put(containerID, target); + selectedTargets.add(target); + selectedSources.add(source); + selectionCriteria.setSelectedContainers( + new HashSet<>(containerToSourceMap.keySet())); + } + + /** + * Calculates the number of used bytes given capacity and utilization ratio. + * + * @param nodeCapacity capacity of the node. + * @param utilizationRatio used space by capacity ratio of the node. + * @return number of bytes + */ + private long ratioToBytes(Long nodeCapacity, double utilizationRatio) { + return (long) (nodeCapacity * utilizationRatio); + } + + /** + * Calculates the average utilization for the specified nodes. + * Utilization is (capacity - remaining) divided by capacity. + * + * @param nodes List of DatanodeUsageInfo to find the average utilization for + * @return Average utilization value + */ + @VisibleForTesting + double calculateAvgUtilization(List<DatanodeUsageInfo> nodes) { + if (nodes.size() == 0) { + LOG.warn("No nodes to calculate average utilization for in " + + "ContainerBalancer."); + return 0; + } + SCMNodeStat aggregatedStats = new SCMNodeStat( + 0, 0, 0); + for (DatanodeUsageInfo node : nodes) { + aggregatedStats.add(node.getScmNodeStat()); + } + clusterCapacity = aggregatedStats.getCapacity().get(); + clusterRemaining = aggregatedStats.getRemaining().get(); + + return (clusterCapacity - clusterRemaining) / (double) clusterCapacity; + } + + /** + * Get potential targets for container move. Potential targets are under + * utilized and within threshold utilized nodes. + * + * @return A list of potential target DatanodeUsageInfo. + */ + private List<DatanodeUsageInfo> getPotentialTargets() { + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + return underUtilizedNodes; + } + + /** + * Get potential sourecs for container move. Potential sourecs are over + * utilized and within threshold utilized nodes. + * + * @return A list of potential source DatanodeUsageInfo. + */ + private List<DatanodeUsageInfo> getPotentialSources() { + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + return overUtilizedNodes; + } + + /** + * Consults the configurations {@link ContainerBalancerTask#includeNodes} and + * {@link ContainerBalancerTask#excludeNodes} to check if the specified + * Datanode should be excluded from balancing. + * @param datanode DatanodeDetails to check + * @return true if Datanode should be excluded, else false + */ + private boolean shouldExcludeDatanode(DatanodeDetails datanode) { + if (excludeNodes.contains(datanode.getHostName()) || + excludeNodes.contains(datanode.getIpAddress())) { + return true; + } else if (!includeNodes.isEmpty()) { + return !includeNodes.contains(datanode.getHostName()) && + !includeNodes.contains(datanode.getIpAddress()); + } + return false; + } + + /** + * Updates conditions for balancing, such as total size moved by balancer, + * total size that has entered a datanode, and total size that has left a + * datanode in this iteration. + * + * @param source source datanode + * @param moveSelection selected target datanode and container + */ + private void incSizeSelectedForMoving(DatanodeDetails source, + ContainerMoveSelection moveSelection) { + DatanodeDetails target = moveSelection.getTargetNode(); + ContainerInfo container; + try { + container = + containerManager.getContainer(moveSelection.getContainerID()); + } catch (ContainerNotFoundException e) { + LOG.warn("Could not find Container {} while matching source and " + + "target nodes in ContainerBalancer", + moveSelection.getContainerID(), e); + return; + } + long size = container.getUsedBytes(); + sizeScheduledForMoveInLatestIteration += size; + + // update sizeLeavingNode map with the recent moveSelection + findSourceStrategy.increaseSizeLeaving(source, size); + + // update sizeEnteringNode map with the recent moveSelection + findTargetStrategy.increaseSizeEntering(target, size); + } + + /** + * Resets some variables and metrics for this iteration. + */ + private void resetState() { + this.clusterCapacity = 0L; + this.clusterRemaining = 0L; + this.overUtilizedNodes.clear(); + this.underUtilizedNodes.clear(); + this.unBalancedNodes.clear(); + this.containerToSourceMap.clear(); + this.containerToTargetMap.clear(); + this.selectedSources.clear(); + this.selectedTargets.clear(); + this.countDatanodesInvolvedPerIteration = 0; + this.sizeScheduledForMoveInLatestIteration = 0; + this.sizeActuallyMovedInLatestIteration = 0; + metrics.resetDataSizeMovedGBInLatestIteration(); + metrics.resetNumContainerMovesCompletedInLatestIteration(); + metrics.resetNumContainerMovesTimeoutInLatestIteration(); + metrics.resetNumDatanodesInvolvedInLatestIteration(); + metrics.resetDataSizeUnbalancedGB(); + metrics.resetNumDatanodesUnbalanced(); + metrics.resetNumContainerMovesFailedInLatestIteration(); + } + + /** + * Checks if ContainerBalancer is currently running in this SCM. + * + * @return true if the currentBalancingThread is not null, otherwise false Review Comment: This is no longer correct ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java: ########## @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMService; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager; +import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link ContainerBalancer}. + */ +public class TestContainerBalancerTask { + + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerBalancerTask.class); + + private ReplicationManager replicationManager; + private ContainerManager containerManager; + private ContainerBalancerTask containerBalancerTask; + private MockNodeManager mockNodeManager; + private StorageContainerManager scm; + private OzoneConfiguration conf; + private PlacementPolicy placementPolicy; + private PlacementPolicy ecPlacementPolicy; + private PlacementPolicyValidateProxy placementPolicyValidateProxy; + private ContainerBalancerConfiguration balancerConfiguration; + private List<DatanodeUsageInfo> nodesInCluster; + private List<Double> nodeUtilizations; + private double averageUtilization; + private int numberOfNodes; + private Map<ContainerID, Set<ContainerReplica>> cidToReplicasMap = + new HashMap<>(); + private Map<ContainerID, ContainerInfo> cidToInfoMap = new HashMap<>(); + private Map<DatanodeUsageInfo, Set<ContainerID>> datanodeToContainersMap = + new HashMap<>(); + private Map<String, ByteString> serviceToConfigMap = new HashMap<>(); + private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current(); + + private StatefulServiceStateManager serviceStateManager; + private static final long STORAGE_UNIT = OzoneConsts.GB; + + /** + * Sets up configuration values and creates a mock cluster. + */ + @BeforeEach + public void setup() throws IOException, NodeNotFoundException, + TimeoutException { + conf = new OzoneConfiguration(); + scm = Mockito.mock(StorageContainerManager.class); + containerManager = Mockito.mock(ContainerManager.class); + replicationManager = Mockito.mock(ReplicationManager.class); + serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class); + SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class); + + // these configs will usually be specified in each test + balancerConfiguration = + conf.getObject(ContainerBalancerConfiguration.class); + balancerConfiguration.setThreshold(10); + balancerConfiguration.setIterations(1); + balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); + balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); + balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT); + conf.setFromObject(balancerConfiguration); + GenericTestUtils.setLogLevel(ContainerBalancer.LOG, Level.DEBUG); + + averageUtilization = createCluster(); + mockNodeManager = new MockNodeManager(datanodeToContainersMap); + + NetworkTopology clusterMap = mockNodeManager.getClusterNetworkTopologyMap(); + + placementPolicy = ContainerPlacementPolicyFactory + .getPolicy(conf, mockNodeManager, clusterMap, true, + SCMContainerPlacementMetrics.create()); + ecPlacementPolicy = ContainerPlacementPolicyFactory.getECPolicy( + conf, mockNodeManager, clusterMap, + true, SCMContainerPlacementMetrics.create()); + placementPolicyValidateProxy = new PlacementPolicyValidateProxy( + placementPolicy, ecPlacementPolicy); + + Mockito.when(replicationManager + .isContainerReplicatingOrDeleting(Mockito.any(ContainerID.class))) + .thenReturn(false); + + Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class))) + .thenReturn(CompletableFuture.completedFuture(MoveResult.COMPLETED)); + + when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class))) + .thenAnswer(invocationOnMock -> { + ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; + return cidToReplicasMap.get(cid); + }); + + when(containerManager.getContainer(Mockito.any(ContainerID.class))) + .thenAnswer(invocationOnMock -> { + ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; + return cidToInfoMap.get(cid); + }); + + when(containerManager.getContainers()) + .thenReturn(new ArrayList<>(cidToInfoMap.values())); + + when(scm.getScmNodeManager()).thenReturn(mockNodeManager); + when(scm.getContainerPlacementPolicy()).thenReturn(placementPolicy); + when(scm.getContainerManager()).thenReturn(containerManager); + when(scm.getReplicationManager()).thenReturn(replicationManager); + when(scm.getScmContext()).thenReturn(SCMContext.emptyContext()); + when(scm.getClusterMap()).thenReturn(null); + when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class)); + when(scm.getConfiguration()).thenReturn(conf); + when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager); + when(scm.getSCMServiceManager()).thenReturn(scmServiceManager); + when(scm.getPlacementPolicyValidateProxy()) + .thenReturn(placementPolicyValidateProxy); + + /* + When StatefulServiceStateManager#saveConfiguration is called, save to + in-memory serviceToConfigMap instead. + */ + Mockito.doAnswer(i -> { + serviceToConfigMap.put(i.getArgument(0, String.class), i.getArgument(1, + ByteString.class)); + return null; + }).when(serviceStateManager).saveConfiguration( + Mockito.any(String.class), + Mockito.any(ByteString.class)); + + /* + When StatefulServiceStateManager#readConfiguration is called, read from + serviceToConfigMap instead. + */ + when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer( + i -> serviceToConfigMap.get(i.getArgument(0, String.class))); + + Mockito.doNothing().when(scmServiceManager) + .register(Mockito.any(SCMService.class)); + ContainerBalancer sb = new ContainerBalancer(scm); + containerBalancerTask = new ContainerBalancerTask(scm, 0, sb, + sb.getMetrics(), null); + } + + @Test + public void testCalculationOfUtilization() { + Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); + for (int i = 0; i < nodesInCluster.size(); i++) { + Assertions.assertEquals(nodeUtilizations.get(i), + nodesInCluster.get(i).calculateUtilization(), 0.0001); + } + + // should be equal to average utilization of the cluster + Assertions.assertEquals(averageUtilization, + containerBalancerTask.calculateAvgUtilization(nodesInCluster), 0.0001); + } + + /** + * Checks whether ContainerBalancer is correctly updating the list of + * unBalanced nodes with varying values of Threshold. + */ + @Test + public void + initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException, TimeoutException { Review Comment: These exceptions are not being thrown in the method body ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java: ########## @@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException, } @Test - public void testCalculationOfUtilization() { - Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); - for (int i = 0; i < nodesInCluster.size(); i++) { - Assertions.assertEquals(nodeUtilizations.get(i), - nodesInCluster.get(i).calculateUtilization(), 0.0001); - } - + public void testShouldRun() throws Exception { + boolean doRun = containerBalancer.shouldRun(); // should be equal to average utilization of the cluster - Assertions.assertEquals(averageUtilization, - containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001); - } - - /** - * Checks whether ContainerBalancer is correctly updating the list of - * unBalanced nodes with varying values of Threshold. - */ - @Test - public void - initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - List<DatanodeUsageInfo> expectedUnBalancedNodes; - List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer; - - // check for random threshold values - for (int i = 0; i < 50; i++) { - double randomThreshold = RANDOM.nextDouble() * 100; - - balancerConfiguration.setThreshold(randomThreshold); - startBalancer(balancerConfiguration); - - // waiting for balance completed. - // TODO: this is a temporary implementation for now - // modify this after balancer is fully completed - try { - Thread.sleep(100); - } catch (InterruptedException e) { } - - expectedUnBalancedNodes = - determineExpectedUnBalancedNodes(randomThreshold); - unBalancedNodesAccordingToBalancer = - containerBalancer.getUnBalancedNodes(); - - stopBalancer(); - Assertions.assertEquals( - expectedUnBalancedNodes.size(), - unBalancedNodesAccordingToBalancer.size()); - - for (int j = 0; j < expectedUnBalancedNodes.size(); j++) { - Assertions.assertEquals( - expectedUnBalancedNodes.get(j).getDatanodeDetails(), - unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails()); - } - } + Assertions.assertEquals(doRun, false); + containerBalancer.saveConfiguration(balancerConfiguration, true, 0); + doRun = containerBalancer.shouldRun(); + Assertions.assertEquals(doRun, true); + containerBalancer.saveConfiguration(balancerConfiguration, false, 0); + doRun = containerBalancer.shouldRun(); + Assertions.assertEquals(doRun, false); } - /** - * Checks whether the list of unBalanced nodes is empty when the cluster is - * balanced. - */ @Test - public void unBalancedNodesListShouldBeEmptyWhenClusterIsBalanced() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(99.99); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(100); - - stopBalancer(); - ContainerBalancerMetrics metrics = containerBalancer.getMetrics(); - Assertions.assertEquals(0, containerBalancer.getUnBalancedNodes().size()); - Assertions.assertEquals(0, metrics.getNumDatanodesUnbalanced()); - } - - /** - * ContainerBalancer should not involve more datanodes than the - * maxDatanodesRatioToInvolvePerIteration limit. - */ - @Test - public void containerBalancerShouldObeyMaxDatanodesToInvolveLimit() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - int percent = 20; - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration( - percent); - balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT); - balancerConfiguration.setThreshold(1); - balancerConfiguration.setIterations(1); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - int number = percent * numberOfNodes / 100; - ContainerBalancerMetrics metrics = containerBalancer.getMetrics(); - Assertions.assertFalse( - containerBalancer.getCountDatanodesInvolvedPerIteration() > number); - Assertions.assertTrue( - metrics.getNumDatanodesInvolvedInLatestIteration() > 0); - Assertions.assertFalse( - metrics.getNumDatanodesInvolvedInLatestIteration() > number); - stopBalancer(); - } + public void testStartBalancerStop() throws Exception { + Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class))) + .thenReturn(genCompletableFuture(1000)); - @Test - public void containerBalancerShouldSelectOnlyClosedContainers() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - // make all containers open, balancer should not select any of them - for (ContainerInfo containerInfo : cidToInfoMap.values()) { - containerInfo.setState(HddsProtos.LifeCycleState.OPEN); - } balancerConfiguration.setThreshold(10); - startBalancer(balancerConfiguration); - sleepWhileBalancing(500); - stopBalancer(); - - // balancer should have identified unbalanced nodes - Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); - // no container should have been selected - Assertions.assertTrue(containerBalancer.getContainerToSourceMap() - .isEmpty()); - /* - Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container - move is generated - */ - Assertions.assertEquals( - ContainerBalancer.IterationResult.CAN_NOT_BALANCE_ANY_MORE, - containerBalancer.getIterationResult()); - - // now, close all containers - for (ContainerInfo containerInfo : cidToInfoMap.values()) { - containerInfo.setState(HddsProtos.LifeCycleState.CLOSED); - } - startBalancer(balancerConfiguration); - sleepWhileBalancing(500); - stopBalancer(); - - // check whether all selected containers are closed - for (ContainerID cid: - containerBalancer.getContainerToSourceMap().keySet()) { - Assertions.assertSame( - cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED); - } - } - - @Test - public void containerBalancerShouldObeyMaxSizeToMoveLimit() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(1); - balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT); balancerConfiguration.setIterations(1); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - // balancer should not have moved more size than the limit - Assertions.assertFalse( - containerBalancer.getSizeScheduledForMoveInLatestIteration() > - 10 * STORAGE_UNIT); - - long size = containerBalancer.getMetrics() - .getDataSizeMovedGBInLatestIteration(); - Assertions.assertTrue(size > 0); - Assertions.assertFalse(size > 10); - stopBalancer(); - } - - @Test - public void targetDatanodeShouldNotAlreadyContainSelectedContainer() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(10); + balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT); balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT); balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(1000); - - stopBalancer(); - Map<ContainerID, DatanodeDetails> map = - containerBalancer.getContainerToTargetMap(); - for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) { - ContainerID container = entry.getKey(); - DatanodeDetails target = entry.getValue(); - Assertions.assertTrue(cidToReplicasMap.get(container) - .stream() - .map(ContainerReplica::getDatanodeDetails) - .noneMatch(target::equals)); - } - } + balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000)); - @Test - public void containerMoveSelectionShouldFollowPlacementPolicy() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - balancerConfiguration.setIterations(1); startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - stopBalancer(); - Map<ContainerID, DatanodeDetails> containerFromSourceMap = - containerBalancer.getContainerToSourceMap(); - Map<ContainerID, DatanodeDetails> containerToTargetMap = - containerBalancer.getContainerToTargetMap(); - - // for each move selection, check if {replicas - source + target} - // satisfies placement policy - for (Map.Entry<ContainerID, DatanodeDetails> entry : - containerFromSourceMap.entrySet()) { - ContainerID container = entry.getKey(); - DatanodeDetails source = entry.getValue(); - - List<DatanodeDetails> replicas = cidToReplicasMap.get(container) - .stream() - .map(ContainerReplica::getDatanodeDetails) - .collect(Collectors.toList()); - // remove source and add target - replicas.remove(source); - replicas.add(containerToTargetMap.get(container)); - - ContainerInfo containerInfo = cidToInfoMap.get(container); - ContainerPlacementStatus placementStatus = - placementPolicy.validateContainerPlacement(replicas, - containerInfo.getReplicationConfig().getRequiredNodes()); - Assertions.assertTrue(placementStatus.isPolicySatisfied()); - } - } - - @Test - public void targetDatanodeShouldBeInServiceHealthy() - throws NodeNotFoundException, IllegalContainerBalancerStateException, - IOException, InvalidContainerBalancerConfigurationException, - TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); - balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT); - balancerConfiguration.setIterations(1); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - stopBalancer(); - for (DatanodeDetails target : containerBalancer.getSelectedTargets()) { - NodeStatus status = mockNodeManager.getNodeStatus(target); - Assertions.assertSame(HddsProtos.NodeOperationalState.IN_SERVICE, - status.getOperationalState()); - Assertions.assertTrue(status.isHealthy()); + try { + containerBalancer.startBalancer(balancerConfiguration); + Assertions.assertTrue(false, + "Exception should be thrown when startBalancer again"); + } catch (IllegalContainerBalancerStateException e) { + // start failed again, valid case } - } - - @Test - public void selectedContainerShouldNotAlreadyHaveBeenSelected() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, NodeNotFoundException, - TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); - balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT); - balancerConfiguration.setIterations(1); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - stopBalancer(); - - int numContainers = containerBalancer.getContainerToTargetMap().size(); - - /* - Assuming move is called exactly once for each unique container, number of - calls to move should equal number of unique containers. If number of - calls to move is more than number of unique containers, at least one - container has been re-selected. It's expected that number of calls to - move should equal number of unique, selected containers (from - containerToTargetMap). - */ - Mockito.verify(replicationManager, times(numContainers)) - .move(any(ContainerID.class), any(DatanodeDetails.class), - any(DatanodeDetails.class)); - } - - @Test - public void balancerShouldNotSelectConfiguredExcludeContainers() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); - balancerConfiguration.setMaxSizeEnteringTarget(50 * STORAGE_UNIT); - balancerConfiguration.setExcludeContainers("1, 4, 5"); - startBalancer(balancerConfiguration); - - sleepWhileBalancing(500); - - stopBalancer(); - Set<ContainerID> excludeContainers = - balancerConfiguration.getExcludeContainers(); - for (ContainerID container : - containerBalancer.getContainerToSourceMap().keySet()) { - Assertions.assertFalse(excludeContainers.contains(container)); + try { + containerBalancer.start(); + Assertions.assertTrue(false, + "Exception should be thrown when start again"); + } catch (IllegalContainerBalancerStateException e) { + // start failed again, valid case } - } - - @Test - public void balancerShouldObeyMaxSizeEnteringTargetLimit() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - conf.set("ozone.scm.container.size", "1MB"); - balancerConfiguration = - conf.getObject(ContainerBalancerConfiguration.class); - balancerConfiguration.setThreshold(10); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - balancerConfiguration.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT); - - // no containers should be selected when the limit is just 2 MB - balancerConfiguration.setMaxSizeEnteringTarget(2 * OzoneConsts.MB); - startBalancer(balancerConfiguration); - sleepWhileBalancing(500); - - Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); - Assertions.assertTrue(containerBalancer.getContainerToSourceMap() - .isEmpty()); - stopBalancer(); - - // some containers should be selected when using default values - OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); - ContainerBalancerConfiguration cbc = ozoneConfiguration. - getObject(ContainerBalancerConfiguration.class); - startBalancer(cbc); - - sleepWhileBalancing(500); - - stopBalancer(); - // balancer should have identified unbalanced nodes - Assertions.assertFalse(containerBalancer.getUnBalancedNodes().isEmpty()); - Assertions.assertFalse(containerBalancer.getContainerToSourceMap() - .isEmpty()); - } - - @Test - public void testMetrics() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - conf.set("hdds.datanode.du.refresh.period", "1ms"); - balancerConfiguration.setBalancingInterval(Duration.ofMillis(2)); - balancerConfiguration.setThreshold(10); - balancerConfiguration.setIterations(1); - balancerConfiguration.setMaxSizeEnteringTarget(6 * STORAGE_UNIT); - // deliberately set max size per iteration to a low value, 6 GB - balancerConfiguration.setMaxSizeToMovePerIteration(6 * STORAGE_UNIT); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - - startBalancer(balancerConfiguration); - sleepWhileBalancing(500); - stopBalancer(); - - ContainerBalancerMetrics metrics = containerBalancer.getMetrics(); - Assertions.assertEquals(determineExpectedUnBalancedNodes( - balancerConfiguration.getThreshold()).size(), - metrics.getNumDatanodesUnbalanced()); - Assertions.assertTrue(metrics.getDataSizeMovedGBInLatestIteration() <= 6); - Assertions.assertEquals(1, metrics.getNumIterations()); - } - - /** - * Tests if {@link ContainerBalancer} follows the includeNodes and - * excludeNodes configurations in {@link ContainerBalancerConfiguration}. - * If the includeNodes configuration is not empty, only the specified - * includeNodes should be included in balancing. excludeNodes should be - * excluded from balancing. If a datanode is specified in both include and - * exclude configurations, then it should be excluded. - */ - @Test - public void balancerShouldFollowExcludeAndIncludeDatanodesConfigurations() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setIterations(1); - balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT); - balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - // only these nodes should be included - // the ones also specified in excludeNodes should be excluded - int firstIncludeIndex = 0, secondIncludeIndex = 1; - int thirdIncludeIndex = nodesInCluster.size() - 2; - int fourthIncludeIndex = nodesInCluster.size() - 1; - String includeNodes = - nodesInCluster.get(firstIncludeIndex).getDatanodeDetails() - .getIpAddress() + ", " + - nodesInCluster.get(secondIncludeIndex).getDatanodeDetails() - .getIpAddress() + ", " + - nodesInCluster.get(thirdIncludeIndex).getDatanodeDetails() - .getHostName() + ", " + - nodesInCluster.get(fourthIncludeIndex).getDatanodeDetails() - .getHostName(); + Assertions.assertTrue(containerBalancer.getBalancerStatus() + == ContainerBalancerTask.Status.RUNNING); - // these nodes should be excluded - int firstExcludeIndex = 0, secondExcludeIndex = nodesInCluster.size() - 1; - String excludeNodes = - nodesInCluster.get(firstExcludeIndex).getDatanodeDetails() - .getIpAddress() + ", " + - nodesInCluster.get(secondExcludeIndex).getDatanodeDetails() - .getHostName(); - - balancerConfiguration.setExcludeNodes(excludeNodes); - balancerConfiguration.setIncludeNodes(includeNodes); - startBalancer(balancerConfiguration); - sleepWhileBalancing(500); stopBalancer(); + Assertions.assertTrue(containerBalancer.getBalancerStatus() + == ContainerBalancerTask.Status.STOPPED); - // finally, these should be the only nodes included in balancing - // (included - excluded) - DatanodeDetails dn1 = - nodesInCluster.get(secondIncludeIndex).getDatanodeDetails(); - DatanodeDetails dn2 = - nodesInCluster.get(thirdIncludeIndex).getDatanodeDetails(); - Map<ContainerID, DatanodeDetails> containerFromSourceMap = - containerBalancer.getContainerToSourceMap(); - Map<ContainerID, DatanodeDetails> containerToTargetMap = - containerBalancer.getContainerToTargetMap(); - for (Map.Entry<ContainerID, DatanodeDetails> entry : - containerFromSourceMap.entrySet()) { - DatanodeDetails source = entry.getValue(); - DatanodeDetails target = containerToTargetMap.get(entry.getKey()); - Assertions.assertTrue(source.equals(dn1) || source.equals(dn2)); - Assertions.assertTrue(target.equals(dn1) || target.equals(dn2)); + try { + containerBalancer.stopBalancer(); + Assertions.assertTrue(false, + "Exception should be thrown when stop again"); + } catch (Exception e) { + // stop failed as already stopped, valid case } } @Test - public void testContainerBalancerConfiguration() { - OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); - ozoneConfiguration.set("ozone.scm.container.size", "5GB"); - ozoneConfiguration.setDouble( - "hdds.container.balancer.utilization.threshold", 1); - - ContainerBalancerConfiguration cbConf = - ozoneConfiguration.getObject(ContainerBalancerConfiguration.class); - Assertions.assertEquals(1, cbConf.getThreshold(), 0.001); - - Assertions.assertEquals(26 * 1024 * 1024 * 1024L, - cbConf.getMaxSizeLeavingSource()); - - Assertions.assertEquals(30 * 60 * 1000, - cbConf.getMoveTimeout().toMillis()); - } - - @Test - public void checkIterationResult() - throws NodeNotFoundException, IOException, - IllegalContainerBalancerStateException, - InvalidContainerBalancerConfigurationException, - TimeoutException { - balancerConfiguration.setThreshold(10); - balancerConfiguration.setIterations(1); - balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT); - balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT); - balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100); - - startBalancer(balancerConfiguration); - sleepWhileBalancing(1000); - - /* - According to the setup and configurations, this iteration's result should - be ITERATION_COMPLETED. - */ - Assertions.assertEquals( - ContainerBalancer.IterationResult.ITERATION_COMPLETED, - containerBalancer.getIterationResult()); - stopBalancer(); - - /* - Now, limit maxSizeToMovePerIteration but fail all container moves. The - result should still be ITERATION_COMPLETED. - */ - Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), - Mockito.any(DatanodeDetails.class), - Mockito.any(DatanodeDetails.class))) - .thenReturn(CompletableFuture.completedFuture( - MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY)); - balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT); - - startBalancer(balancerConfiguration); - sleepWhileBalancing(1000); - - Assertions.assertEquals( - ContainerBalancer.IterationResult.ITERATION_COMPLETED, - containerBalancer.getIterationResult()); - stopBalancer(); - } - - @Test - public void checkIterationResultTimeout() - throws NodeNotFoundException, IOException, - IllegalContainerBalancerStateException, - InvalidContainerBalancerConfigurationException, - TimeoutException { - + public void testStartStop() throws Exception { Review Comment: Maybe we can change this test name to something more appropriate? ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java: ########## @@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException, } @Test - public void testCalculationOfUtilization() { - Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); - for (int i = 0; i < nodesInCluster.size(); i++) { - Assertions.assertEquals(nodeUtilizations.get(i), - nodesInCluster.get(i).calculateUtilization(), 0.0001); - } - + public void testShouldRun() throws Exception { + boolean doRun = containerBalancer.shouldRun(); // should be equal to average utilization of the cluster - Assertions.assertEquals(averageUtilization, - containerBalancer.calculateAvgUtilization(nodesInCluster), 0.0001); - } - - /** - * Checks whether ContainerBalancer is correctly updating the list of - * unBalanced nodes with varying values of Threshold. - */ - @Test - public void - initializeIterationShouldUpdateUnBalancedNodesWhenThresholdChanges() - throws IllegalContainerBalancerStateException, IOException, - InvalidContainerBalancerConfigurationException, TimeoutException { - List<DatanodeUsageInfo> expectedUnBalancedNodes; - List<DatanodeUsageInfo> unBalancedNodesAccordingToBalancer; - - // check for random threshold values - for (int i = 0; i < 50; i++) { - double randomThreshold = RANDOM.nextDouble() * 100; - - balancerConfiguration.setThreshold(randomThreshold); - startBalancer(balancerConfiguration); - - // waiting for balance completed. - // TODO: this is a temporary implementation for now - // modify this after balancer is fully completed - try { - Thread.sleep(100); - } catch (InterruptedException e) { } - - expectedUnBalancedNodes = - determineExpectedUnBalancedNodes(randomThreshold); - unBalancedNodesAccordingToBalancer = - containerBalancer.getUnBalancedNodes(); - - stopBalancer(); - Assertions.assertEquals( - expectedUnBalancedNodes.size(), - unBalancedNodesAccordingToBalancer.size()); - - for (int j = 0; j < expectedUnBalancedNodes.size(); j++) { - Assertions.assertEquals( - expectedUnBalancedNodes.get(j).getDatanodeDetails(), - unBalancedNodesAccordingToBalancer.get(j).getDatanodeDetails()); - } - } + Assertions.assertEquals(doRun, false); + containerBalancer.saveConfiguration(balancerConfiguration, true, 0); + doRun = containerBalancer.shouldRun(); + Assertions.assertEquals(doRun, true); + containerBalancer.saveConfiguration(balancerConfiguration, false, 0); + doRun = containerBalancer.shouldRun(); + Assertions.assertEquals(doRun, false); Review Comment: ```suggestion Assertions.assertFalse(doRun); ``` ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java: ########## @@ -212,559 +206,106 @@ public void setup() throws IOException, NodeNotFoundException, } @Test - public void testCalculationOfUtilization() { - Assertions.assertEquals(nodesInCluster.size(), nodeUtilizations.size()); - for (int i = 0; i < nodesInCluster.size(); i++) { - Assertions.assertEquals(nodeUtilizations.get(i), - nodesInCluster.get(i).calculateUtilization(), 0.0001); - } - + public void testShouldRun() throws Exception { + boolean doRun = containerBalancer.shouldRun(); // should be equal to average utilization of the cluster Review Comment: This comment is no longer valid -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
