This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e619c  HDDS-5517. Support multiple container moves from a source 
datanode in one balance iteration (#2808)
52e619c is described below

commit 52e619cf2b68bf387b613810377e482e4d56aed0
Author: Jackson Yao <[email protected]>
AuthorDate: Tue Nov 23 15:12:12 2021 +0800

    HDDS-5517. Support multiple container moves from a source datanode in one 
balance iteration (#2808)
---
 .../scm/container/balancer/ContainerBalancer.java  | 186 +++++++--------------
 .../balancer/ContainerBalancerConfiguration.java   |  11 +-
 .../ContainerBalancerSelectionCriteria.java        |  22 ++-
 .../scm/container/balancer/FindSourceGreedy.java   | 158 +++++++++++++++++
 .../scm/container/balancer/FindSourceStrategy.java |  67 ++++++++
 .../scm/container/balancer/FindTargetGreedy.java   | 121 ++++++++++++--
 .../scm/container/balancer/FindTargetStrategy.java |  32 ++--
 7 files changed, 436 insertions(+), 161 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 646ff8a..a234682 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,7 +51,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /**
  * Container balancer is a service in SCM to move containers between over- and
@@ -86,15 +84,15 @@ public class ContainerBalancer {
   private long clusterRemaining;
   private double clusterAvgUtilisation;
   private double upperLimit;
+  private double lowerLimit;
   private volatile boolean balancerRunning;
   private volatile Thread currentBalancingThread;
   private Lock lock;
   private ContainerBalancerSelectionCriteria selectionCriteria;
   private Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap;
-  private Map<DatanodeDetails, Long> sizeLeavingNode;
-  private Map<DatanodeDetails, Long> sizeEnteringNode;
   private Set<ContainerID> selectedContainers;
   private FindTargetStrategy findTargetStrategy;
+  private FindSourceStrategy findSourceStrategy;
   private Map<ContainerMoveSelection,
       CompletableFuture<ReplicationManager.MoveResult>>
       moveSelectionToFutureMap;
@@ -131,8 +129,9 @@ public class ContainerBalancer {
     this.unBalancedNodes = new ArrayList<>();
 
     this.lock = new ReentrantLock();
-    findTargetStrategy =
-        new FindTargetGreedy(containerManager, placementPolicy);
+    findTargetStrategy = new FindTargetGreedy(
+        containerManager, placementPolicy, nodeManager);
+    findSourceStrategy = new FindSourceGreedy(nodeManager);
   }
 
   /**
@@ -251,7 +250,6 @@ public class ContainerBalancer {
     this.selectedContainers.clear();
     this.overUtilizedNodes.clear();
     this.underUtilizedNodes.clear();
-    this.withinThresholdUtilizedNodes.clear();
     this.unBalancedNodes.clear();
     this.countDatanodesInvolvedPerIteration = 0;
     this.sizeMovedPerIteration = 0;
@@ -262,20 +260,19 @@ public class ContainerBalancer {
           clusterAvgUtilisation);
     }
 
-    // under utilized nodes have utilization(that is, used / capacity) less
-    // than lower limit
-    double lowerLimit = clusterAvgUtilisation - threshold;
-
     // over utilized nodes have utilization(that is, used / capacity) greater
     // than upper limit
     this.upperLimit = clusterAvgUtilisation + threshold;
+    // under utilized nodes have utilization(that is, used / capacity) less
+    // than lower limit
+    this.lowerLimit = clusterAvgUtilisation - threshold;
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Lower limit for utilization is {} and Upper limit for " +
           "utilization is {}", lowerLimit, upperLimit);
     }
 
-    long overUtilizedBytes = 0L, underUtilizedBytes = 0L;
+    long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L;
     // find over and under utilized nodes
     for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
       if (!isBalancerRunning()) {
@@ -291,7 +288,7 @@ public class ContainerBalancer {
             datanodeUsageInfo.getScmNodeStat().getRemaining().get(),
             utilization);
       }
-      if (utilization > upperLimit) {
+      if (Double.compare(utilization, upperLimit) > 0) {
         overUtilizedNodes.add(datanodeUsageInfo);
         metrics.incrementDatanodesNumToBalance(1);
 
@@ -300,27 +297,30 @@ public class ContainerBalancer {
             ratioToPercent(utilization)));
 
         // amount of bytes greater than upper limit in this node
-        overUtilizedBytes += ratioToBytes(
+        Long overUtilizedBytes = ratioToBytes(
             datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
             utilization) - ratioToBytes(
             datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
             upperLimit);
-      } else if (utilization < lowerLimit) {
+        totalOverUtilizedBytes += overUtilizedBytes;
+      } else if (Double.compare(utilization, lowerLimit) < 0) {
         underUtilizedNodes.add(datanodeUsageInfo);
         metrics.incrementDatanodesNumToBalance(1);
 
         // amount of bytes lesser than lower limit in this node
-        underUtilizedBytes += ratioToBytes(
+        Long underUtilizedBytes = ratioToBytes(
             datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
             lowerLimit) - ratioToBytes(
             datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
             utilization);
+        totalUnderUtilizedBytes += underUtilizedBytes;
       } else {
         withinThresholdUtilizedNodes.add(datanodeUsageInfo);
       }
     }
     metrics.setDataSizeToBalanceGB(
-        Math.max(overUtilizedBytes, underUtilizedBytes) / OzoneConsts.GB);
+        Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) /
+            OzoneConsts.GB);
     Collections.reverse(underUtilizedNodes);
 
     unBalancedNodes = new ArrayList<>(
@@ -338,102 +338,62 @@ public class ContainerBalancer {
         overUtilizedNodes.size(), underUtilizedNodes.size());
 
     selectionCriteria = new ContainerBalancerSelectionCriteria(config,
-        nodeManager, replicationManager, containerManager);
+        nodeManager, replicationManager, containerManager, findSourceStrategy);
     sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
         withinThresholdUtilizedNodes.size());
-
-    // initialize maps to track how much size is leaving and entering datanodes
-    sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
-        withinThresholdUtilizedNodes.size());
-    overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
-        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
-        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-
-    sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
-        withinThresholdUtilizedNodes.size());
-    underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
-        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-    withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
-        .put(datanodeUsageInfo.getDatanodeDetails(), 0L));
-
     return true;
   }
 
   private IterationResult doIteration() {
     // note that potential and selected targets are updated in the following
     // loop
-    List<DatanodeDetails> potentialTargets = getPotentialTargets();
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit);
+    List<DatanodeUsageInfo> potentialTargets = getPotentialTargets();
+    findTargetStrategy.reInitialize(potentialTargets, config, upperLimit);
+
     Set<DatanodeDetails> selectedTargets =
         new HashSet<>(potentialTargets.size());
     moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
     boolean isMoveGenerated = false;
-
     try {
       // match each overUtilized node with a target
-      for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
+      while (true) {
+        DatanodeDetails source =
+            findSourceStrategy.getNextCandidateSourceDataNode();
+        if (source == null) {
+          break;
+        }
         if (!isBalancerRunning()) {
           return IterationResult.ITERATION_INTERRUPTED;
         }
-        DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
+
         IterationResult result = checkConditionsForBalancing();
         if (result != null) {
           return result;
         }
 
-        ContainerMoveSelection moveSelection =
-            matchSourceWithTarget(source, potentialTargets);
+        ContainerMoveSelection moveSelection = matchSourceWithTarget(source);
         if (moveSelection != null) {
           isMoveGenerated = true;
           LOG.info("ContainerBalancer is trying to move container {} from " +
                   "source datanode {} to target datanode {}",
-              moveSelection.getContainerID().toString(), 
source.getUuidString(),
+              moveSelection.getContainerID().toString(),
+              source.getUuidString(),
               moveSelection.getTargetNode().getUuidString());
 
           if (moveContainer(source, moveSelection)) {
             // consider move successful for now, and update selection criteria
-            potentialTargets = updateTargetsAndSelectionCriteria(
-                potentialTargets, selectedTargets, moveSelection, source);
+            updateTargetsAndSelectionCriteria(
+                selectedTargets, moveSelection, source);
           }
+        } else {
+          // can not find any target for this source
+          findSourceStrategy.removeCandidateSourceDataNode(source);
         }
       }
 
-      // if not all underUtilized nodes have been selected, try to match
-      // withinThresholdUtilized nodes with underUtilized nodes
-      if (selectedTargets.size() < underUtilizedNodes.size()) {
-        potentialTargets.removeAll(selectedTargets);
-        Collections.reverse(withinThresholdUtilizedNodes);
-
-        for (DatanodeUsageInfo datanodeUsageInfo :
-            withinThresholdUtilizedNodes) {
-          if (!balancerRunning) {
-            return IterationResult.ITERATION_INTERRUPTED;
-          }
-          DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
-          IterationResult result = checkConditionsForBalancing();
-          if (result != null) {
-            return result;
-          }
-
-          ContainerMoveSelection moveSelection =
-              matchSourceWithTarget(source, potentialTargets);
-          if (moveSelection != null) {
-            isMoveGenerated = true;
-            LOG.info("ContainerBalancer is trying to move container {} from " +
-                    "source datanode {} to target datanode {}",
-                moveSelection.getContainerID().toString(),
-                source.getUuidString(),
-                moveSelection.getTargetNode().getUuidString());
-
-            if (moveContainer(source, moveSelection)) {
-              // consider move successful for now, and update selection 
criteria
-              potentialTargets =
-                  updateTargetsAndSelectionCriteria(potentialTargets,
-                      selectedTargets, moveSelection, source);
-            }
-          }
-        }
-      }
       if (!isMoveGenerated) {
         //no move option is generated, so the cluster can not be
         //balanced any more, just stop iteration and exit
@@ -502,12 +462,9 @@ public class ContainerBalancer {
    * Match a source datanode with a target datanode and identify the container
    * to move.
    *
-   * @param potentialTargets Collection of potential targets to move
-   *                         container to
    * @return ContainerMoveSelection containing the selected target and 
container
    */
-  private ContainerMoveSelection matchSourceWithTarget(
-      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets) {
+  private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) 
{
     NavigableSet<ContainerID> candidateContainers =
         selectionCriteria.getCandidateContainers(source);
 
@@ -518,14 +475,14 @@ public class ContainerBalancer {
       }
       return null;
     }
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("ContainerBalancer is finding suitable target for source " +
           "datanode {}", source.getUuidString());
     }
     ContainerMoveSelection moveSelection =
         findTargetStrategy.findTargetForContainerMove(
-            source, potentialTargets, candidateContainers,
-            this::canSizeEnterTarget);
+            source, candidateContainers);
 
     if (moveSelection == null) {
       if (LOG.isDebugEnabled()) {
@@ -622,15 +579,13 @@ public class ContainerBalancer {
   /**
    * Update targets and selection criteria after a move.
    *
-   * @param potentialTargets potential target datanodes
    * @param selectedTargets  selected target datanodes
    * @param moveSelection    the target datanode and container that has been
    *                         just selected
    * @param source           the source datanode
    * @return List of updated potential targets
    */
-  private List<DatanodeDetails> updateTargetsAndSelectionCriteria(
-      Collection<DatanodeDetails> potentialTargets,
+  private void updateTargetsAndSelectionCriteria(
       Set<DatanodeDetails> selectedTargets,
       ContainerMoveSelection moveSelection, DatanodeDetails source) {
     // count source if it has not been involved in move earlier
@@ -647,10 +602,6 @@ public class ContainerBalancer {
     selectedTargets.add(moveSelection.getTargetNode());
     selectedContainers.add(moveSelection.getContainerID());
     selectionCriteria.setSelectedContainers(selectedContainers);
-
-    return potentialTargets.stream()
-        .filter(node -> sizeEnteringNode.get(node) <
-            config.getMaxSizeEnteringTarget()).collect(Collectors.toList());
   }
 
   /**
@@ -689,46 +640,31 @@ public class ContainerBalancer {
     return (clusterCapacity - clusterRemaining) / (double) clusterCapacity;
   }
 
+
+
+
   /**
-   * Checks if specified size can enter specified target datanode
-   * according to {@link ContainerBalancerConfiguration}
-   * "size.entering.target.max".
+   * Get potential targets for container move. Potential targets are under
+   * utilized and within threshold utilized nodes.
    *
-   * @param target target datanode in which size is entering
-   * @param size   size in bytes
-   * @return true if size can enter target, else false
+   * @return A list of potential target DatanodeUsageInfo.
    */
-  boolean canSizeEnterTarget(DatanodeDetails target, long size) {
-    if (sizeEnteringNode.containsKey(target)) {
-      long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size;
-      //size can be moved into target datanode only when the following
-      //two condition are met.
-      //1 sizeEnteringAfterMove does not succeed the configured
-      // MaxSizeEnteringTarget
-      //2 current usage of target datanode plus sizeEnteringAfterMove
-      // is smaller than or equal to upperLimit
-      return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() &&
-           nodeManager.getUsageInfo(target)
-               .calculateUtilization(sizeEnteringAfterMove) <= upperLimit;
-    }
-    return false;
+  private List<DatanodeUsageInfo> getPotentialTargets() {
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    return underUtilizedNodes;
   }
 
   /**
-   * Get potential targets for container move. Potential targets are under
+   * Get potential sourecs for container move. Potential sourecs are over
    * utilized and within threshold utilized nodes.
    *
-   * @return A list of potential target DatanodeDetails.
+   * @return A list of potential source DatanodeUsageInfo.
    */
-  private List<DatanodeDetails> getPotentialTargets() {
-    List<DatanodeDetails> potentialTargets = new ArrayList<>(
-        underUtilizedNodes.size() + withinThresholdUtilizedNodes.size());
-
-    underUtilizedNodes
-        .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
-    withinThresholdUtilizedNodes
-        .forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
-    return potentialTargets;
+  private List<DatanodeUsageInfo> getPotentialSources() {
+    //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
+    // source and target
+    return overUtilizedNodes;
   }
 
   /**
@@ -756,10 +692,10 @@ public class ContainerBalancer {
     sizeMovedPerIteration += size;
 
     // update sizeLeavingNode map with the recent moveSelection
-    sizeLeavingNode.put(source, sizeLeavingNode.get(source) + size);
+    findSourceStrategy.increaseSizeLeaving(source, size);
 
     // update sizeEnteringNode map with the recent moveSelection
-    sizeEnteringNode.put(target, sizeEnteringNode.get(target) + size);
+    findTargetStrategy.increaseSizeEntering(target, size);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 4e7b889..a77c1b7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -307,9 +307,16 @@ public final class ContainerBalancerConfiguration {
             "%-50s %s%n" +
             "%-50s %s%n" +
             "%-50s %s%n" +
-            "%-50s %dB%n", "Key", "Value", "Threshold",
+            "%-50s %dGB%n"+
+            "%-50s %dGB%n"+
+            "%-50s %dGB%n", "Key", "Value", "Threshold",
         threshold, "Max Datanodes to Involve per Iteration(ratio)",
         maxDatanodesRatioToInvolvePerIteration,
-        "Max Size to Move per Iteration", maxSizeToMovePerIteration);
+        "Max Size to Move per Iteration",
+        maxSizeToMovePerIteration / OzoneConsts.GB,
+        "Max Size Entering Target per Iteration",
+        maxSizeEnteringTarget / OzoneConsts.GB,
+        "Max Size Leaving Source per Iteration",
+        maxSizeLeavingSource / OzoneConsts.GB);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
index 11d1571..b5f5acd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
@@ -48,18 +48,21 @@ public class ContainerBalancerSelectionCriteria {
   private ContainerManager containerManager;
   private Set<ContainerID> selectedContainers;
   private Set<ContainerID> excludeContainers;
+  private FindSourceStrategy findSourceStrategy;
 
   public ContainerBalancerSelectionCriteria(
       ContainerBalancerConfiguration balancerConfiguration,
       NodeManager nodeManager,
       ReplicationManager replicationManager,
-      ContainerManager containerManager) {
+      ContainerManager containerManager,
+      FindSourceStrategy findSourceStrategy) {
     this.balancerConfiguration = balancerConfiguration;
     this.nodeManager = nodeManager;
     this.replicationManager = replicationManager;
     this.containerManager = containerManager;
     selectedContainers = new HashSet<>();
     excludeContainers = balancerConfiguration.getExcludeContainers();
+    this.findSourceStrategy = findSourceStrategy;
   }
 
   /**
@@ -115,6 +118,23 @@ public class ContainerBalancerSelectionCriteria {
       }
     });
 
+    //if the utilization of the source data node becomes lower than lowerLimit
+    //after the container is moved out , then the container can not be
+    // a candidate one, and we should remove it from the candidateContainers.
+    containerIDSet.removeIf(c -> {
+      ContainerInfo cInfo;
+      try {
+        cInfo = containerManager.getContainer(c);
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Could not find container {} when " +
+            "be matched with a move target", c);
+        //remove this not found container
+        return true;
+      }
+      return !findSourceStrategy.canSizeLeaveSource(
+          node, cInfo.getUsedBytes());
+    });
+
     containerIDSet.removeIf(this::isContainerReplicatingOrDeleting);
     return containerIDSet;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
new file mode 100644
index 0000000..591461d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.UUID;
+
+/**
+ * The selection criteria for selecting source datanodes , the containers of
+ * which will be moved out.
+ */
+public class FindSourceGreedy implements FindSourceStrategy{
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FindSourceGreedy.class);
+  private Map<DatanodeDetails, Long> sizeLeavingNode;
+  private PriorityQueue<DatanodeUsageInfo> potentialSources;
+  private NodeManager nodeManager;
+  private ContainerBalancerConfiguration config;
+  private Double lowerLimit;
+
+  FindSourceGreedy(NodeManager nodeManager) {
+    sizeLeavingNode = new HashMap<>();
+    potentialSources = new PriorityQueue<>((a, b) -> {
+      double currentUsageOfA = a.calculateUtilization(
+          -sizeLeavingNode.get(a.getDatanodeDetails()));
+      double currentUsageOfB = b.calculateUtilization(
+          -sizeLeavingNode.get(b.getDatanodeDetails()));
+      //in descending order
+      int ret = Double.compare(currentUsageOfB, currentUsageOfA);
+      if (ret != 0) {
+        return ret;
+      }
+      UUID uuidA = a.getDatanodeDetails().getUuid();
+      UUID uuidB = b.getDatanodeDetails().getUuid();
+      return uuidA.compareTo(uuidB);
+    });
+    this.nodeManager = nodeManager;
+  }
+
+  private void setLowerLimit(Double lowerLimit) {
+    this.lowerLimit = lowerLimit;
+  }
+
+  private void setPotentialSources(
+      List<DatanodeUsageInfo> potentialSourceDataNodes) {
+    potentialSources.clear();
+    sizeLeavingNode.clear();
+    potentialSourceDataNodes.forEach(
+        c -> sizeLeavingNode.put(c.getDatanodeDetails(), 0L));
+    potentialSources.addAll(potentialSourceDataNodes);
+  }
+
+  private void setConfiguration(ContainerBalancerConfiguration conf) {
+    this.config = conf;
+  }
+
+  /**
+   * increase the Leaving size of a candidate source data node.
+   */
+  @Override
+  public void increaseSizeLeaving(DatanodeDetails dui, long size) {
+    Long currentSize = sizeLeavingNode.get(dui);
+    if(currentSize != null) {
+      sizeLeavingNode.put(dui, currentSize + size);
+      //reorder according to the latest sizeLeavingNode
+      potentialSources.add(nodeManager.getUsageInfo(dui));
+      return;
+    }
+    LOG.warn("Cannot find datanode {} in candidate source datanodes",
+        dui.getUuid());
+  }
+
+  /**
+   * get the next candidate source data node according to
+   * the strategy.
+   *
+   * @return the nex candidate source data node.
+   */
+  @Override
+  public DatanodeDetails getNextCandidateSourceDataNode() {
+    if (potentialSources.isEmpty()) {
+      LOG.info("no more candidate source data node");
+      return null;
+    }
+    return potentialSources.poll().getDatanodeDetails();
+  }
+
+  /**
+   * remove the specified data node from candidate source
+   * data nodes.
+   */
+  @Override
+  public void removeCandidateSourceDataNode(DatanodeDetails dui){
+    potentialSources.removeIf(a -> a.getDatanodeDetails().equals(dui));
+  }
+
+  /**
+   * Checks if specified size can leave a specified target datanode
+   * according to {@link ContainerBalancerConfiguration}
+   * "size.entering.target.max".
+   *
+   * @param source target datanode in which size is entering
+   * @param size   size in bytes
+   * @return true if size can leave, else false
+   */
+  @Override
+  public boolean canSizeLeaveSource(DatanodeDetails source, long size) {
+    if (sizeLeavingNode.containsKey(source)) {
+      long sizeLeavingAfterMove = sizeLeavingNode.get(source) + size;
+      //size can be moved out of source datanode only when the following
+      //two condition are met.
+      //1 sizeLeavingAfterMove does not succeed the configured
+      // MaxSizeLeavingTarget
+      //2 after subtracting sizeLeavingAfterMove, the usage is bigger
+      // than or equal to lowerLimit
+      return sizeLeavingAfterMove <= config.getMaxSizeLeavingSource() &&
+          Double.compare(nodeManager.getUsageInfo(source)
+              .calculateUtilization(-sizeLeavingAfterMove), lowerLimit) >= 0;
+    }
+    return false;
+  }
+
+  /**
+   * reInitialize FindSourceStrategy.
+   */
+  @Override
+  public void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+                           ContainerBalancerConfiguration conf,
+                           Double lowLimit) {
+    setConfiguration(conf);
+    setLowerLimit(lowLimit);
+    setPotentialSources(potentialDataNodes);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
new file mode 100644
index 0000000..826ecac
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+
+import java.util.List;
+
+/**
+ * This interface can be used to implement strategies to get a
+ * source datanode.
+ */
+public interface FindSourceStrategy {
+
+  /**
+   * get the next candidate source data node according to
+   * the strategy.
+   *
+   * @return the nex candidate source data node.
+   */
+  DatanodeDetails getNextCandidateSourceDataNode();
+
+  /**
+   * remove the specified data node from candidate source
+   * data nodes.
+   */
+  void removeCandidateSourceDataNode(DatanodeDetails dui);
+
+  /**
+   * increase the Leaving size of a candidate source data node.
+   */
+  void increaseSizeLeaving(DatanodeDetails dui, long size);
+
+  /**
+   * Checks if specified size can leave a specified source datanode
+   * according to {@link ContainerBalancerConfiguration}
+   * "size.entering.target.max".
+   *
+   * @param source target datanode in which size is entering
+   * @param size   size in bytes
+   * @return true if size can leave, else false
+   */
+  boolean canSizeLeaveSource(DatanodeDetails source, long size);
+
+  /**
+   * reInitialize FindSourceStrategy.
+   */
+  void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+                    ContainerBalancerConfiguration config, Double lowerLimit);
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
index 9d3deb4..5508943 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java
@@ -26,13 +26,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.TreeSet;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -44,37 +48,72 @@ public class FindTargetGreedy implements FindTargetStrategy 
{
 
   private ContainerManager containerManager;
   private PlacementPolicy placementPolicy;
+  private Map<DatanodeDetails, Long> sizeEnteringNode;
+  private NodeManager nodeManager;
+  private ContainerBalancerConfiguration config;
+  private Double upperLimit;
+  private TreeSet<DatanodeUsageInfo> potentialTargets;
 
   public FindTargetGreedy(
       ContainerManager containerManager,
-      PlacementPolicy placementPolicy) {
+      PlacementPolicy placementPolicy,
+      NodeManager nodeManager) {
+    sizeEnteringNode = new HashMap<>();
     this.containerManager = containerManager;
     this.placementPolicy = placementPolicy;
+    this.nodeManager = nodeManager;
+
+    potentialTargets = new TreeSet<>((a, b) -> {
+      double currentUsageOfA = a.calculateUtilization(
+          sizeEnteringNode.get(a.getDatanodeDetails()));
+      double currentUsageOfB = b.calculateUtilization(
+          sizeEnteringNode.get(b.getDatanodeDetails()));
+      int ret = Double.compare(currentUsageOfA, currentUsageOfB);
+      if (ret != 0) {
+        return ret;
+      }
+      UUID uuidA = a.getDatanodeDetails().getUuid();
+      UUID uuidB = b.getDatanodeDetails().getUuid();
+      return uuidA.compareTo(uuidB);
+    });
+  }
+
+  private void setUpperLimit(Double upperLimit){
+    this.upperLimit = upperLimit;
+  }
+
+  private void setPotentialTargets(
+      List<DatanodeUsageInfo> potentialTargetDataNodes) {
+    sizeEnteringNode.clear();
+    potentialTargetDataNodes.forEach(
+        p -> sizeEnteringNode.put(p.getDatanodeDetails(), 0L));
+    potentialTargets.clear();
+    potentialTargets.addAll(potentialTargetDataNodes);
+  }
+
+  private void setConfiguration(ContainerBalancerConfiguration conf) {
+    this.config = conf;
   }
 
   /**
    * Find a {@link ContainerMoveSelection} consisting of a target and
    * container to move for a source datanode. Favours more under-utilized 
nodes.
    * @param source Datanode to find a target for
-   * @param potentialTargets Collection of potential target datanodes
    * @param candidateContainers Set of candidate containers satisfying
    *                            selection criteria
    *                            {@link ContainerBalancerSelectionCriteria}
-   * @param canSizeEnterTarget A functional interface whose apply
    * (DatanodeDetails, Long) method returns true if the size specified in the
    * second argument can enter the specified DatanodeDetails node
    * @return Found target and container
    */
   @Override
   public ContainerMoveSelection findTargetForContainerMove(
-      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets,
-      Set<ContainerID> candidateContainers,
-      BiFunction<DatanodeDetails, Long, Boolean> canSizeEnterTarget) {
-    for (DatanodeDetails target : potentialTargets) {
+      DatanodeDetails source, Set<ContainerID> candidateContainers) {
+    for (DatanodeUsageInfo targetInfo : potentialTargets) {
+      DatanodeDetails target = targetInfo.getDatanodeDetails();
       for (ContainerID container : candidateContainers) {
         Set<ContainerReplica> replicas;
         ContainerInfo containerInfo;
-
         try {
           replicas = containerManager.getContainerReplicas(container);
           containerInfo = containerManager.getContainer(container);
@@ -88,7 +127,7 @@ public class FindTargetGreedy implements FindTargetStrategy {
             replica -> replica.getDatanodeDetails().equals(target)) &&
             containerMoveSatisfiesPlacementPolicy(container, replicas, source,
             target) &&
-            canSizeEnterTarget.apply(target, containerInfo.getUsedBytes())) {
+            canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
           return new ContainerMoveSelection(target, container);
         }
       }
@@ -107,8 +146,7 @@ public class FindTargetGreedy implements FindTargetStrategy 
{
    * @param target Target datanode for container move
    * @return true if placement policy is satisfied, otherwise false
    */
-  @Override
-  public boolean containerMoveSatisfiesPlacementPolicy(
+  private boolean containerMoveSatisfiesPlacementPolicy(
       ContainerID containerID, Set<ContainerReplica> replicas,
       DatanodeDetails source, DatanodeDetails target) {
     ContainerInfo containerInfo;
@@ -132,4 +170,61 @@ public class FindTargetGreedy implements 
FindTargetStrategy {
 
     return placementStatus.isPolicySatisfied();
   }
+
+  /**
+   * Checks if specified size can enter specified target datanode
+   * according to {@link ContainerBalancerConfiguration}
+   * "size.entering.target.max".
+   *
+   * @param target target datanode in which size is entering
+   * @param size   size in bytes
+   * @return true if size can enter target, else false
+   */
+  private boolean canSizeEnterTarget(DatanodeDetails target, long size) {
+    if (sizeEnteringNode.containsKey(target)) {
+      long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size;
+      //size can be moved into target datanode only when the following
+      //two condition are met.
+      //1 sizeEnteringAfterMove does not succeed the configured
+      // MaxSizeEnteringTarget
+      //2 current usage of target datanode plus sizeEnteringAfterMove
+      // is smaller than or equal to upperLimit
+      return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() &&
+          Double.compare(nodeManager.getUsageInfo(target)
+              .calculateUtilization(sizeEnteringAfterMove), upperLimit) <= 0;
+    }
+    return false;
+  }
+
+  /**
+   * increase the Entering size of a candidate target data node.
+   */
+  @Override
+  public void increaseSizeEntering(DatanodeDetails target, long size) {
+    if(sizeEnteringNode.containsKey(target)) {
+      long totalEnteringSize = sizeEnteringNode.get(target) + size;
+      sizeEnteringNode.put(target, totalEnteringSize);
+      potentialTargets.removeIf(
+          c -> c.getDatanodeDetails().equals(target));
+      if(totalEnteringSize < config.getMaxSizeEnteringTarget()) {
+        //reorder
+        potentialTargets.add(nodeManager.getUsageInfo(target));
+      }
+      return;
+    }
+    LOG.warn("Cannot find {} in the candidates target nodes",
+        target.getUuid());
+  }
+
+  /**
+   * reInitialize FindTargetStrategy with the given new parameters.
+   */
+  @Override
+  public void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+                           ContainerBalancerConfiguration conf,
+                           Double upLimit) {
+    setConfiguration(conf);
+    setUpperLimit(upLimit);
+    setPotentialTargets(potentialDataNodes);
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
index 444f365..de87860 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java
@@ -20,11 +20,10 @@ package org.apache.hadoop.hdds.scm.container.balancer;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 
-import java.util.Collection;
+import java.util.List;
 import java.util.Set;
-import java.util.function.BiFunction;
 
 /**
  * This interface can be used to implement strategies to find a target for a
@@ -39,33 +38,26 @@ public interface FindTargetStrategy {
    * enter a potential target.
    *
    * @param source Datanode to find a target for
-   * @param potentialTargets Collection of potential target datanodes
    * @param candidateContainers Set of candidate containers satisfying
    *                            selection criteria
    *                            {@link ContainerBalancerSelectionCriteria}
-   * @param canSizeEnterTarget A functional interface whose apply
    * (DatanodeDetails, Long) method returns true if the size specified in the
    * second argument can enter the specified DatanodeDetails node
    * @return {@link ContainerMoveSelection} containing the target node and
    * selected container
    */
   ContainerMoveSelection findTargetForContainerMove(
-      DatanodeDetails source, Collection<DatanodeDetails> potentialTargets,
-      Set<ContainerID> candidateContainers,
-      BiFunction<DatanodeDetails, Long, Boolean> canSizeEnterTarget);
+      DatanodeDetails source, Set<ContainerID> candidateContainers);
 
   /**
-   * Checks whether moving the specified container from the specified source
-   * to target datanode will satisfy the placement policy.
-   *
-   * @param containerID Container to be moved from source to target
-   * @param replicas Set of replicas of the given container
-   * @param source Source datanode for container move
-   * @param target Target datanode for container move
-   * @return true if placement policy is satisfied
+   * increase the Entering size of a candidate target data node.
    */
-  boolean containerMoveSatisfiesPlacementPolicy(ContainerID containerID,
-                                                Set<ContainerReplica> replicas,
-                                                DatanodeDetails source,
-                                                DatanodeDetails target);
+  void increaseSizeEntering(DatanodeDetails target, long size);
+
+  /**
+   * reInitialize FindTargetStrategy.
+   */
+  void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
+                    ContainerBalancerConfiguration config, Double upperLimit);
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to