This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 52e619c HDDS-5517. Support multiple container moves from a source
datanode in one balance iteration (#2808)
52e619c is described below
commit 52e619cf2b68bf387b613810377e482e4d56aed0
Author: Jackson Yao <[email protected]>
AuthorDate: Tue Nov 23 15:12:12 2021 +0800
HDDS-5517. Support multiple container moves from a source datanode in one
balance iteration (#2808)
---
.../scm/container/balancer/ContainerBalancer.java | 186 +++++++--------------
.../balancer/ContainerBalancerConfiguration.java | 11 +-
.../ContainerBalancerSelectionCriteria.java | 22 ++-
.../scm/container/balancer/FindSourceGreedy.java | 158 +++++++++++++++++
.../scm/container/balancer/FindSourceStrategy.java | 67 ++++++++
.../scm/container/balancer/FindTargetGreedy.java | 121 ++++++++++++--
.../scm/container/balancer/FindTargetStrategy.java | 32 ++--
7 files changed, 436 insertions(+), 161 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 646ff8a..a234682 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -52,7 +51,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
/**
* Container balancer is a service in SCM to move containers between over- and
@@ -86,15 +84,15 @@ public class ContainerBalancer {
private long clusterRemaining;
private double clusterAvgUtilisation;
private double upperLimit;
+ private double lowerLimit;
private volatile boolean balancerRunning;
private volatile Thread currentBalancingThread;
private Lock lock;
private ContainerBalancerSelectionCriteria selectionCriteria;
private Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap;
- private Map<DatanodeDetails, Long> sizeLeavingNode;
- private Map<DatanodeDetails, Long> sizeEnteringNode;
private Set<ContainerID> selectedContainers;
private FindTargetStrategy findTargetStrategy;
+ private FindSourceStrategy findSourceStrategy;
private Map<ContainerMoveSelection,
CompletableFuture<ReplicationManager.MoveResult>>
moveSelectionToFutureMap;
@@ -131,8 +129,9 @@ public class ContainerBalancer {
this.unBalancedNodes = new ArrayList<>();
this.lock = new ReentrantLock();
- findTargetStrategy =
- new FindTargetGreedy(containerManager, placementPolicy);
+ findTargetStrategy = new FindTargetGreedy(
+ containerManager, placementPolicy, nodeManager);
+ findSourceStrategy = new FindSourceGreedy(nodeManager);
}
/**
@@ -251,7 +250,6 @@ public class ContainerBalancer {
this.selectedContainers.clear();
this.overUtilizedNodes.clear();
this.underUtilizedNodes.clear();
- this.withinThresholdUtilizedNodes.clear();
this.unBalancedNodes.clear();
this.countDatanodesInvolvedPerIteration = 0;
this.sizeMovedPerIteration = 0;
@@ -262,20 +260,19 @@ public class ContainerBalancer {
clusterAvgUtilisation);
}
- // under utilized nodes have utilization(that is, used / capacity) less
- // than lower limit
- double lowerLimit = clusterAvgUtilisation - threshold;
-
// over utilized nodes have utilization(that is, used / capacity) greater
// than upper limit
this.upperLimit = clusterAvgUtilisation + threshold;
+ // under utilized nodes have utilization(that is, used / capacity) less
+ // than lower limit
+ this.lowerLimit = clusterAvgUtilisation - threshold;
if (LOG.isDebugEnabled()) {
LOG.debug("Lower limit for utilization is {} and Upper limit for " +
"utilization is {}", lowerLimit, upperLimit);
}
- long overUtilizedBytes = 0L, underUtilizedBytes = 0L;
+ long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L;
// find over and under utilized nodes
for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
if (!isBalancerRunning()) {
@@ -291,7 +288,7 @@ public class ContainerBalancer {
datanodeUsageInfo.getScmNodeStat().getRemaining().get(),
utilization);
}
- if (utilization > upperLimit) {
+ if (Double.compare(utilization, upperLimit) > 0) {
overUtilizedNodes.add(datanodeUsageInfo);
metrics.incrementDatanodesNumToBalance(1);
@@ -300,27 +297,30 @@ public class ContainerBalancer {
ratioToPercent(utilization)));
// amount of bytes greater than upper limit in this node
- overUtilizedBytes += ratioToBytes(
+ Long overUtilizedBytes = ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
utilization) - ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
upperLimit);
- } else if (utilization < lowerLimit) {
+ totalOverUtilizedBytes += overUtilizedBytes;
+ } else if (Double.compare(utilization, lowerLimit) < 0) {
underUtilizedNodes.add(datanodeUsageInfo);
metrics.incrementDatanodesNumToBalance(1);
// amount of bytes lesser than lower limit in this node
- underUtilizedBytes += ratioToBytes(
+ Long underUtilizedBytes = ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
lowerLimit) - ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
utilization);
+ totalUnderUtilizedBytes += underUtilizedBytes;
} else {
withinThresholdUtilizedNodes.add(datanodeUsageInfo);
}
}
metrics.setDataSizeToBalanceGB(
- Math.max(overUtilizedBytes, underUtilizedBytes) / OzoneConsts.GB);
+ Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) /
+ OzoneConsts.GB);
Collections.reverse(underUtilizedNodes);
unBalancedNodes = new ArrayList<>(
@@ -338,102 +338,62 @@ public class ContainerBalancer {
overUtilizedNodes.size(), underUtilizedNodes.size());
selectionCriteria = new ContainerBalancerSelectionCriteria(config,
- nodeManager, replicationManager, containerManager);
+ nodeManager, replicationManager, containerManager, findSourceStrategy);
sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
withinThresholdUtilizedNodes.size());
-
- // initialize maps to track how much size is leaving and entering datanodes
- sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
- withinThresholdUtilizedNodes.size());
- overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
- .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
- withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
- .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-
- sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
- withinThresholdUtilizedNodes.size());
- underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
- .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
- withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
- .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-
return true;
}
private IterationResult doIteration() {
// note that potential and selected targets are updated in the following
// loop
- List<DatanodeDetails> potentialTargets = getPotentialTargets();
+ //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+ // source and target
+ findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit);
+ List<DatanodeUsageInfo> potentialTargets = getPotentialTargets();
+ findTargetStrategy.reInitialize(potentialTargets, config, upperLimit);
+
Set<DatanodeDetails> selectedTargets =
new HashSet<>(potentialTargets.size());
moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
boolean isMoveGenerated = false;
-
try {
// match each overUtilized node with a target
- for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+ while (true) {
+ DatanodeDetails source =
+ findSourceStrategy.getNextCandidateSourceDataNode();
+ if (source == null) {
+ break;
+ }
if (!isBalancerRunning()) {
return IterationResult.ITERATION_INTERRUPTED;
}
- DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+
IterationResult result = checkConditionsForBalancing();
if (result != null) {
return result;
}
- ContainerMoveSelection moveSelection =
- matchSourceWithTarget(source, potentialTargets);
+ ContainerMoveSelection moveSelection = matchSourceWithTarget(source);
if (moveSelection != null) {
isMoveGenerated = true;
LOG.info("ContainerBalancer is trying to move container {} from " +
"source datanode {} to target datanode {}",
- moveSelection.getContainerID().toString(),
source.getUuidString(),
+ moveSelection.getContainerID().toString(),
+ source.getUuidString(),
moveSelection.getTargetNode().getUuidString());
if (moveContainer(source, moveSelection)) {
// consider move successful for now, and update selection criteria
- potentialTargets = updateTargetsAndSelectionCriteria(
- potentialTargets, selectedTargets, moveSelection, source);
+ updateTargetsAndSelectionCriteria(
+ selectedTargets, moveSelection, source);
}
+ } else {
+ // can not find any target for this source
+ findSourceStrategy.removeCandidateSourceDataNode(source);
}
}
- // if not all underUtilized nodes have been selected, try to match
- // withinThresholdUtilized nodes with underUtilized nodes
- if (selectedTargets.size() < underUtilizedNodes.size()) {
- potentialTargets.removeAll(selectedTargets);
- Collections.reverse(withinThresholdUtilizedNodes);
-
- for (DatanodeUsageInfo datanodeUsageInfo :
- withinThresholdUtilizedNodes) {
- if (!balancerRunning) {
- return IterationResult.ITERATION_INTERRUPTED;
- }
- DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
- IterationResult result = checkConditionsForBalancing();
- if (result != null) {
- return result;
- }
-
- ContainerMoveSelection moveSelection =
- matchSourceWithTarget(source, potentialTargets);
- if (moveSelection != null) {
- isMoveGenerated = true;
- LOG.info("ContainerBalancer is trying to move container {} from " +
- "source datanode {} to target datanode {}",
- moveSelection.getContainerID().toString(),
- source.getUuidString(),
- moveSelection.getTargetNode().getUuidString());
-
- if (moveContainer(source, moveSelection)) {
- // consider move successful for now, and update selection
criteria
- potentialTargets =
- updateTargetsAndSelectionCriteria(potentialTargets,
- selectedTargets, moveSelection, source);
- }
- }
- }
- }
if (!isMoveGenerated) {
//no move option is generated, so the cluster can not be
//balanced any more, just stop iteration and exit
@@ -502,12 +462,9 @@ public class ContainerBalancer {
* Match a source datanode with a target datanode and identify the container
* to move.
*
- * @param potentialTargets Collection of potential targets to move
- * container to
* @return ContainerMoveSelection containing the selected target and
container
*/
- private ContainerMoveSelection matchSourceWithTarget(
- DatanodeDetails source, Collection<DatanodeDetails> potentialTargets) {
+ private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source)
{
NavigableSet<ContainerID> candidateContainers =
selectionCriteria.getCandidateContainers(source);
@@ -518,14 +475,14 @@ public class ContainerBalancer {
}
return null;
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerBalancer is finding suitable target for source " +
"datanode {}", source.getUuidString());
}
ContainerMoveSelection moveSelection =
findTargetStrategy.findTargetForContainerMove(
- source, potentialTargets, candidateContainers,
- this::canSizeEnterTarget);
+ source, candidateContainers);
if (moveSelection == null) {
if (LOG.isDebugEnabled()) {
@@ -622,15 +579,13 @@ public class ContainerBalancer {
/**
* Update targets and selection criteria after a move.
*
- * @param potentialTargets potential target datanodes
* @param selectedTargets selected target datanodes
* @param moveSelection the target datanode and container that has been
* just selected
* @param source the source datanode
* @return List of updated potential targets
*/
- private List<DatanodeDetails> updateTargetsAndSelectionCriteria(
- Collection<DatanodeDetails> potentialTargets,
+ private void updateTargetsAndSelectionCriteria(
Set<DatanodeDetails> selectedTargets,
ContainerMoveSelection moveSelection, DatanodeDetails source) {
// count source if it has not been involved in move earlier
@@ -647,10 +602,6 @@ public class ContainerBalancer {
selectedTargets.add(moveSelection.getTargetNode());
selectedContainers.add(moveSelection.getContainerID());
selectionCriteria.setSelectedContainers(selectedContainers);
-
- return potentialTargets.stream()
- .filter(node -> sizeEnteringNode.get(node) <
- config.getMaxSizeEnteringTarget()).collect(Collectors.toList());
}
/**
@@ -689,46 +640,31 @@ public class ContainerBalancer {
return (clusterCapacity - clusterRemaining) / (double) clusterCapacity;
}
+
+
+
/**
- * Checks if specified size can enter specified target datanode
- * according to {@link ContainerBalancerConfiguration}
- * "size.entering.target.max".
+ * Get potential targets for container move. Potential targets are under
+ * utilized and within threshold utilized nodes.
*
- * @param target target datanode in which size is entering
- * @param size size in bytes
- * @return true if size can enter target, else false
+ * @return A list of potential target DatanodeUsageInfo.
*/
- boolean canSizeEnterTarget(DatanodeDetails target, long size) {
- if (sizeEnteringNode.containsKey(target)) {
- long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size;
- //size can be moved into target datanode only when the following
- //two condition are met.
- //1 sizeEnteringAfterMove does not succeed the configured
- // MaxSizeEnteringTarget
- //2 current usage of target datanode plus sizeEnteringAfterMove
- // is smaller than or equal to upperLimit
- return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() &&
- nodeManager.getUsageInfo(target)
- .calculateUtilization(sizeEnteringAfterMove) <= upperLimit;
- }
- return false;
+ private List<DatanodeUsageInfo> getPotentialTargets() {
+ //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+ // source and target
+ return underUtilizedNodes;
}
/**
- * Get potential targets for container move. Potential targets are under
+ * Get potential sourecs for container move. Potential sourecs are over
* utilized and within threshold utilized nodes.
*
- * @return A list of potential target DatanodeDetails.
+ * @return A list of potential source DatanodeUsageInfo.
*/
- private List<DatanodeDetails> getPotentialTargets() {
- List<DatanodeDetails> potentialTargets = new ArrayList<>(
- underUtilizedNodes.size() + withinThresholdUtilizedNodes.size());
-
- underUtilizedNodes
- .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
- withinThresholdUtilizedNodes
- .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
- return potentialTargets;
+ private List<DatanodeUsageInfo> getPotentialSources() {
+ //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+ // source and target
+ return overUtilizedNodes;
}
/**
@@ -756,10 +692,10 @@ public class ContainerBalancer {
sizeMovedPerIteration += size;
// update sizeLeavingNode map with the recent moveSelection
- sizeLeavingNode.put(source, sizeLeavingNode.get(source) + size);
+ findSourceStrategy.increaseSizeLeaving(source, size);
// update sizeEnteringNode map with the recent moveSelection
- sizeEnteringNode.put(target, sizeEnteringNode.get(target) + size);
+ findTargetStrategy.increaseSizeEntering(target, size);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e7b889..a77c1b7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -307,9 +307,16 @@ public final class ContainerBalancerConfiguration {
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
- "%-50s %dB%n", "Key", "Value", "Threshold",
+ "%-50s %dGB%n"+
+ "%-50s %dGB%n"+
+ "%-50s %dGB%n", "Key", "Value", "Threshold",
threshold, "Max Datanodes to Involve per Iteration(ratio)",
maxDatanodesRatioToInvolvePerIteration,
- "Max Size to Move per Iteration", maxSizeToMovePerIteration);
+ "Max Size to Move per Iteration",
+ maxSizeToMovePerIteration / OzoneConsts.GB,
+ "Max Size Entering Target per Iteration",
+ maxSizeEnteringTarget / OzoneConsts.GB,
+ "Max Size Leaving Source per Iteration",
+ maxSizeLeavingSource / OzoneConsts.GB);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
index 11d1571..b5f5acd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
@@ -48,18 +48,21 @@ public class ContainerBalancerSelectionCriteria {
private ContainerManager containerManager;
private Set<ContainerID> selectedContainers;
private Set<ContainerID> excludeContainers;
+ private FindSourceStrategy findSourceStrategy;
public ContainerBalancerSelectionCriteria(
ContainerBalancerConfiguration balancerConfiguration,
NodeManager nodeManager,
ReplicationManager replicationManager,
- ContainerManager containerManager) {
+ ContainerManager containerManager,
+ FindSourceStrategy findSourceStrategy) {
this.balancerConfiguration = balancerConfiguration;
this.nodeManager = nodeManager;
this.replicationManager = replicationManager;
this.containerManager = containerManager;
selectedContainers = new HashSet<>();
excludeContainers = balancerConfiguration.getExcludeContainers();
+ this.findSourceStrategy = findSourceStrategy;
}
/**
@@ -115,6 +118,23 @@ public class ContainerBalancerSelectionCriteria {
}
});
+ //if the utilization of the source data node becomes lower than lowerLimit
+ //after the container is moved out , then the container can not be
+ // a candidate one, and we should remove it from the candidateContainers.
+ containerIDSet.removeIf(c -> {
+ ContainerInfo cInfo;
+ try {
+ cInfo = containerManager.getContainer(c);
+ } catch (ContainerNotFoundException e) {
+ LOG.warn("Could not find container {} when " +
+ "be matched with a move target", c);
+ //remove this not found container
+ return true;
+ }
+ return !findSourceStrategy.canSizeLeaveSource(
+ node, cInfo.getUsedBytes());
+ });
+
containerIDSet.removeIf(this::isContainerReplicatingOrDeleting);
return containerIDSet;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
new file mode 100644
index 0000000..591461d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.UUID;
+
+/**
+ * The selection criteria for selecting source datanodes , the containers of
+ * which will be moved out.
+ */
+public class FindSourceGreedy implements FindSourceStrategy{
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FindSourceGreedy.class);
+ private Map<DatanodeDetails, Long> sizeLeavingNode;
+ private PriorityQueue<DatanodeUsageInfo> potentialSources;
+ private NodeManager nodeManager;
+ private ContainerBalancerConfiguration config;
+ private Double lowerLimit;
+
+ FindSourceGreedy(NodeManager nodeManager) {
+ sizeLeavingNode = new HashMap<>();
+ potentialSources = new PriorityQueue<>((a, b) -> {
+ double currentUsageOfA = a.calculateUtilization(
+ -sizeLeavingNode.get(a.getDatanodeDetails()));
+ double currentUsageOfB = b.calculateUtilization(
+ -sizeLeavingNode.get(b.getDatanodeDetails()));
+ //in descending order
+ int ret = Double.compare(currentUsageOfB, currentUsageOfA);
+ if (ret != 0) {
+ return ret;
+ }
+ UUID uuidA = a.getDatanodeDetails().getUuid();
+ UUID uuidB = b.getDatanodeDetails().getUuid();
+ return uuidA.compareTo(uuidB);
+ });
+ this.nodeManager = nodeManager;
+ }
+
+ private void setLowerLimit(Double lowerLimit) {
+ this.lowerLimit = lowerLimit;
+ }
+
+ private void setPotentialSources(
+ List<DatanodeUsageInfo> potentialSourceDataNodes) {
+ potentialSources.clear();
+ sizeLeavingNode.clear();
+ potentialSourceDataNodes.forEach(
+ c -> sizeLeavingNode.put(c.getDatanodeDetails(), 0L));
+ potentialSources.addAll(potentialSourceDataNodes);
+ }
+
+ private void setConfiguration(ContainerBalancerConfiguration conf) {
+ this.config = conf;
+ }
+
+ /**
+ * increase the Leaving size of a candidate source data node.
+ */
+ @Override
+ public void increaseSizeLeaving(DatanodeDetails dui, long size) {
+ Long currentSize = sizeLeavingNode.get(dui);
+ if(currentSize != null) {
+ sizeLeavingNode.put(dui, currentSize + size);
+ //reorder according to the latest sizeLeavingNode
+ potentialSources.add(nodeManager.getUsageInfo(dui));
+ return;
+ }
+ LOG.warn("Cannot find datanode {} in candidate source datanodes",
+ dui.getUuid());
+ }
+
+ /**
+ * get the next candidate source data node according to
+ * the strategy.
+ *
+ * @return the nex candidate source data node.
+ */
+ @Override
+ public DatanodeDetails getNextCandidateSourceDataNode() {
+ if (potentialSources.isEmpty()) {
+ LOG.info("no more candidate source data node");
+ return null;
+ }
+ return potentialSources.poll().getDatanodeDetails();
+ }
+
+ /**
+ * remove the specified data node from candidate source
+ * data nodes.
+ */
+ @Override
+ public void removeCandidateSourceDataNode(DatanodeDetails dui){
+ potentialSources.removeIf(a -> a.getDatanodeDetails().equals(dui));
+ }
+
+ /**
+ * Checks if specified size can leave a specified target datanode
+ * according to {@link ContainerBalancerConfiguration}
+ * "size.entering.target.max".
+ *
+ * @param source target datanode in which size is entering
+ * @param size size in bytes
+ * @return true if size can leave, else false
+ */
+ @Override
+ public boolean canSizeLeaveSource(DatanodeDetails source, long size) {
+ if (sizeLeavingNode.containsKey(source)) {
+ long sizeLeavingAfterMove = sizeLeavingNode.get(source) + size;
+ //size can be moved out of source datanode only when the following
+ //two condition are met.
+ //1 sizeLeavingAfterMove does not succeed the configured
+ // MaxSizeLeavingTarget
+ //2 after subtracting sizeLeavingAfterMove, the usage is bigger
+ // than or equal to lowerLimit
+ return sizeLeavingAfterMove <= config.getMaxSizeLeavingSource() &&
+ Double.compare(nodeManager.getUsageInfo(source)
+ .calculateUtilization(-sizeLeavingAfterMove), lowerLimit) >= 0;
+ }
+ return false;
+ }
+
+ /**
+ * reInitialize FindSourceStrategy.
+ */
+ @Override
+ public void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+ ContainerBalancerConfiguration conf,
+ Double lowLimit) {
+ setConfiguration(conf);
+ setLowerLimit(lowLimit);
+ setPotentialSources(potentialDataNodes);
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
new file mode 100644
index 0000000..826ecac
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+
+import java.util.List;
+
+/**
+ * This interface can be used to implement strategies to get a
+ * source datanode.
+ */
+public interface FindSourceStrategy {
+
+ /**
+ * get the next candidate source data node according to
+ * the strategy.
+ *
+ * @return the nex candidate source data node.
+ */
+ DatanodeDetails getNextCandidateSourceDataNode();
+
+ /**
+ * remove the specified data node from candidate source
+ * data nodes.
+ */
+ void removeCandidateSourceDataNode(DatanodeDetails dui);
+
+ /**
+ * increase the Leaving size of a candidate source data node.
+ */
+ void increaseSizeLeaving(DatanodeDetails dui, long size);
+
+ /**
+ * Checks if specified size can leave a specified source datanode
+ * according to {@link ContainerBalancerConfiguration}
+ * "size.entering.target.max".
+ *
+ * @param source target datanode in which size is entering
+ * @param size size in bytes
+ * @return true if size can leave, else false
+ */
+ boolean canSizeLeaveSource(DatanodeDetails source, long size);
+
+ /**
+ * reInitialize FindSourceStrategy.
+ */
+ void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+ ContainerBalancerConfiguration config, Double lowerLimit);
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
index 9d3deb4..5508943 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
@@ -26,13 +26,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.TreeSet;
+import java.util.UUID;
import java.util.stream.Collectors;
/**
@@ -44,37 +48,72 @@ public class FindTargetGreedy implements FindTargetStrategy
{
private ContainerManager containerManager;
private PlacementPolicy placementPolicy;
+ private Map<DatanodeDetails, Long> sizeEnteringNode;
+ private NodeManager nodeManager;
+ private ContainerBalancerConfiguration config;
+ private Double upperLimit;
+ private TreeSet<DatanodeUsageInfo> potentialTargets;
public FindTargetGreedy(
ContainerManager containerManager,
- PlacementPolicy placementPolicy) {
+ PlacementPolicy placementPolicy,
+ NodeManager nodeManager) {
+ sizeEnteringNode = new HashMap<>();
this.containerManager = containerManager;
this.placementPolicy = placementPolicy;
+ this.nodeManager = nodeManager;
+
+ potentialTargets = new TreeSet<>((a, b) -> {
+ double currentUsageOfA = a.calculateUtilization(
+ sizeEnteringNode.get(a.getDatanodeDetails()));
+ double currentUsageOfB = b.calculateUtilization(
+ sizeEnteringNode.get(b.getDatanodeDetails()));
+ int ret = Double.compare(currentUsageOfA, currentUsageOfB);
+ if (ret != 0) {
+ return ret;
+ }
+ UUID uuidA = a.getDatanodeDetails().getUuid();
+ UUID uuidB = b.getDatanodeDetails().getUuid();
+ return uuidA.compareTo(uuidB);
+ });
+ }
+
+ private void setUpperLimit(Double upperLimit){
+ this.upperLimit = upperLimit;
+ }
+
+ private void setPotentialTargets(
+ List<DatanodeUsageInfo> potentialTargetDataNodes) {
+ sizeEnteringNode.clear();
+ potentialTargetDataNodes.forEach(
+ p -> sizeEnteringNode.put(p.getDatanodeDetails(), 0L));
+ potentialTargets.clear();
+ potentialTargets.addAll(potentialTargetDataNodes);
+ }
+
+ private void setConfiguration(ContainerBalancerConfiguration conf) {
+ this.config = conf;
}
/**
* Find a {@link ContainerMoveSelection} consisting of a target and
* container to move for a source datanode. Favours more under-utilized
nodes.
* @param source Datanode to find a target for
- * @param potentialTargets Collection of potential target datanodes
* @param candidateContainers Set of candidate containers satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
- * @param canSizeEnterTarget A functional interface whose apply
* (DatanodeDetails, Long) method returns true if the size specified in the
* second argument can enter the specified DatanodeDetails node
* @return Found target and container
*/
@Override
public ContainerMoveSelection findTargetForContainerMove(
- DatanodeDetails source, Collection<DatanodeDetails> potentialTargets,
- Set<ContainerID> candidateContainers,
- BiFunction<DatanodeDetails, Long, Boolean> canSizeEnterTarget) {
- for (DatanodeDetails target : potentialTargets) {
+ DatanodeDetails source, Set<ContainerID> candidateContainers) {
+ for (DatanodeUsageInfo targetInfo : potentialTargets) {
+ DatanodeDetails target = targetInfo.getDatanodeDetails();
for (ContainerID container : candidateContainers) {
Set<ContainerReplica> replicas;
ContainerInfo containerInfo;
-
try {
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
@@ -88,7 +127,7 @@ public class FindTargetGreedy implements FindTargetStrategy {
replica -> replica.getDatanodeDetails().equals(target)) &&
containerMoveSatisfiesPlacementPolicy(container, replicas, source,
target) &&
- canSizeEnterTarget.apply(target, containerInfo.getUsedBytes())) {
+ canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
return new ContainerMoveSelection(target, container);
}
}
@@ -107,8 +146,7 @@ public class FindTargetGreedy implements FindTargetStrategy
{
* @param target Target datanode for container move
* @return true if placement policy is satisfied, otherwise false
*/
- @Override
- public boolean containerMoveSatisfiesPlacementPolicy(
+ private boolean containerMoveSatisfiesPlacementPolicy(
ContainerID containerID, Set<ContainerReplica> replicas,
DatanodeDetails source, DatanodeDetails target) {
ContainerInfo containerInfo;
@@ -132,4 +170,61 @@ public class FindTargetGreedy implements
FindTargetStrategy {
return placementStatus.isPolicySatisfied();
}
+
+ /**
+ * Checks if specified size can enter specified target datanode
+ * according to {@link ContainerBalancerConfiguration}
+ * "size.entering.target.max".
+ *
+ * @param target target datanode in which size is entering
+ * @param size size in bytes
+ * @return true if size can enter target, else false
+ */
+ private boolean canSizeEnterTarget(DatanodeDetails target, long size) {
+ if (sizeEnteringNode.containsKey(target)) {
+ long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size;
+ //size can be moved into target datanode only when the following
+ //two condition are met.
+ //1 sizeEnteringAfterMove does not succeed the configured
+ // MaxSizeEnteringTarget
+ //2 current usage of target datanode plus sizeEnteringAfterMove
+ // is smaller than or equal to upperLimit
+ return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() &&
+ Double.compare(nodeManager.getUsageInfo(target)
+ .calculateUtilization(sizeEnteringAfterMove), upperLimit) <= 0;
+ }
+ return false;
+ }
+
+ /**
+ * increase the Entering size of a candidate target data node.
+ */
+ @Override
+ public void increaseSizeEntering(DatanodeDetails target, long size) {
+ if(sizeEnteringNode.containsKey(target)) {
+ long totalEnteringSize = sizeEnteringNode.get(target) + size;
+ sizeEnteringNode.put(target, totalEnteringSize);
+ potentialTargets.removeIf(
+ c -> c.getDatanodeDetails().equals(target));
+ if(totalEnteringSize < config.getMaxSizeEnteringTarget()) {
+ //reorder
+ potentialTargets.add(nodeManager.getUsageInfo(target));
+ }
+ return;
+ }
+ LOG.warn("Cannot find {} in the candidates target nodes",
+ target.getUuid());
+ }
+
+ /**
+ * reInitialize FindTargetStrategy with the given new parameters.
+ */
+ @Override
+ public void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+ ContainerBalancerConfiguration conf,
+ Double upLimit) {
+ setConfiguration(conf);
+ setUpperLimit(upLimit);
+ setPotentialTargets(potentialDataNodes);
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
index 444f365..de87860 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
@@ -20,11 +20,10 @@ package org.apache.hadoop.hdds.scm.container.balancer;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
-import java.util.Collection;
+import java.util.List;
import java.util.Set;
-import java.util.function.BiFunction;
/**
* This interface can be used to implement strategies to find a target for a
@@ -39,33 +38,26 @@ public interface FindTargetStrategy {
* enter a potential target.
*
* @param source Datanode to find a target for
- * @param potentialTargets Collection of potential target datanodes
* @param candidateContainers Set of candidate containers satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
- * @param canSizeEnterTarget A functional interface whose apply
* (DatanodeDetails, Long) method returns true if the size specified in the
* second argument can enter the specified DatanodeDetails node
* @return {@link ContainerMoveSelection} containing the target node and
* selected container
*/
ContainerMoveSelection findTargetForContainerMove(
- DatanodeDetails source, Collection<DatanodeDetails> potentialTargets,
- Set<ContainerID> candidateContainers,
- BiFunction<DatanodeDetails, Long, Boolean> canSizeEnterTarget);
+ DatanodeDetails source, Set<ContainerID> candidateContainers);
/**
- * Checks whether moving the specified container from the specified source
- * to target datanode will satisfy the placement policy.
- *
- * @param containerID Container to be moved from source to target
- * @param replicas Set of replicas of the given container
- * @param source Source datanode for container move
- * @param target Target datanode for container move
- * @return true if placement policy is satisfied
+ * increase the Entering size of a candidate target data node.
*/
- boolean containerMoveSatisfiesPlacementPolicy(ContainerID containerID,
- Set<ContainerReplica> replicas,
- DatanodeDetails source,
- DatanodeDetails target);
+ void increaseSizeEntering(DatanodeDetails target, long size);
+
+ /**
+ * reInitialize FindTargetStrategy.
+ */
+ void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+ ContainerBalancerConfiguration config, Double upperLimit);
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]