This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1356b93aa6 HDDS-8936. Eliminate unnecessary streams in 
SCMContainerPlacementRackScatter (#5002)
1356b93aa6 is described below

commit 1356b93aa6c73f92f049353bb1d53b9438f34e4b
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jun 29 13:29:12 2023 +0200

    HDDS-8936. Eliminate unnecessary streams in 
SCMContainerPlacementRackScatter (#5002)
---
 .../org/apache/hadoop/hdds/scm/net/NetUtils.java   |  10 +-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  22 ++---
 .../SCMContainerPlacementRackScatter.java          | 103 ++++++++++-----------
 3 files changed, 62 insertions(+), 73 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
index b13b006e77..7463c52e95 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * Utility class to facilitate network topology functions.
@@ -104,18 +103,15 @@ public final class NetUtils {
         continue;
       }
       // excludedScope is child of ancestor
-      List<String> duplicateList = mutableExcludedScopes.stream()
-          .filter(ancestor::isAncestor)
-          .collect(Collectors.toList());
-      mutableExcludedScopes.removeAll(duplicateList);
+      mutableExcludedScopes.removeIf(ancestor::isAncestor);
 
       // ancestor is covered by excludedScope
-      mutableExcludedScopes.stream().forEach(scope -> {
+      for (String scope : mutableExcludedScopes) {
         if (ancestor.isDescendant(scope)) {
           // remove exclude node if it's covered by excludedScope
           iterator.remove();
         }
-      });
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 784ff24aad..ef8bacddad 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -27,12 +27,12 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
@@ -576,11 +576,10 @@ public class NetworkTopologyImpl implements 
NetworkTopology {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Start choosing node[scope = {}, index = {}, excludedScopes = "
               + "{}, excludedNodes = {}, affinityNode = {}, ancestorGen = {}",
-          scope, leafIndex, (excludedScopes == null ? "" :
-              excludedScopes.stream().collect(Collectors.joining(", "))),
-          (excludedNodes == null ? "" : excludedNodes.stream()
-              .map(Object::toString).collect(Collectors.joining(", "))),
-          affinityNode == null ? "" : affinityNode.toString(), ancestorGen);
+          scope, leafIndex,
+          excludedScopes == null ? "" : excludedScopes,
+          excludedNodes == null ? "" : excludedNodes,
+          affinityNode == null ? "" : affinityNode, ancestorGen);
     }
 
     String finalScope = scope;
@@ -617,8 +616,9 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
         }
         // excludeScope and finalScope share nothing case
         if (scopeNode.isAncestor(s)) {
-          if (mutableExcludedScopes.stream().
-              noneMatch(n -> getNode(s).isDescendant(n))) {
+          Node node = getNode(s);
+          if (node != null &&
+              mutableExcludedScopes.stream().noneMatch(node::isDescendant)) {
             mutableExcludedScopes.add(s);
           }
         }
@@ -626,7 +626,7 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
     }
 
     // clone excludedNodes before remove duplicate in it
-    Collection<Node> mutableExNodes = new ArrayList<>();
+    Collection<Node> mutableExNodes = new LinkedHashSet<>();
 
     // add affinity node to mutableExNodes
     if (affinityNode != null) {
@@ -636,8 +636,6 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
     // Remove duplicate in excludedNodes
     if (excludedNodes != null) {
       mutableExNodes.addAll(excludedNodes);
-      mutableExNodes =
-          mutableExNodes.stream().distinct().collect(Collectors.toList());
     }
 
     // remove duplicate in mutableExNodes and mutableExcludedScopes
@@ -888,7 +886,7 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
 
   private void checkExcludedScopes(List<String> excludedScopes) {
     if (!CollectionUtils.isEmpty(excludedScopes)) {
-      excludedScopes.stream().forEach(scope -> {
+      excludedScopes.forEach(scope -> {
         if (scope.startsWith(SCOPE_REVERSE_STR)) {
           throw new IllegalArgumentException("excludedScope " + scope +
               " cannot start with " + SCOPE_REVERSE_STR);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index 2d5ade3993..d829c4f671 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
@@ -33,16 +32,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
 
@@ -97,20 +93,16 @@ public final class SCMContainerPlacementRackScatter
     this.metrics = null;
   }
 
-  public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
-       List<Node> unavailableNodes,
-       List<DatanodeDetails> mutableFavoredNodes,
-       int nodesRequired, final Pair<Long, Long> metadatasizeDatasizePair,
-       int maxOuterLoopIterations, final Pair<Map<Node, Integer>, Integer>
-       rackCntMapMaxReplicaPerRackPair) {
+  @SuppressWarnings("checkstyle:parameternumber")
+  private Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
+      List<Node> unavailableNodes,
+      List<DatanodeDetails> mutableFavoredNodes,
+      int nodesRequired, long metadataSizeRequired, long dataSizeRequired,
+      int maxOuterLoopIterations, Map<Node, Integer> rackCntMap,
+      int maxReplicasPerRack) {
     if (nodesRequired <= 0) {
       return Collections.emptySet();
     }
-    final long metadataSizeRequired = metadatasizeDatasizePair.getKey();
-    final long dataSizeRequired = metadatasizeDatasizePair.getValue();
-    final Map<Node, Integer> rackCntMap =
-            rackCntMapMaxReplicaPerRackPair.getKey();
-    final int maxReplicasPerRack = rackCntMapMaxReplicaPerRackPair.getValue();
     List<Node> toChooseRacks = new LinkedList<>();
     Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
     Set<Node> skippedRacks = new HashSet<>();
@@ -124,9 +116,11 @@ public final class SCMContainerPlacementRackScatter
 
       // Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
       // for an even distribution
-      toChooseRacks.addAll(racks.stream()
-              .filter(rack -> rackCntMap.getOrDefault(rack, 0)
-                      < maxReplicasPerRack).collect(Collectors.toList()));
+      for (Node rack : racks) {
+        if (rackCntMap.getOrDefault(rack, 0) < maxReplicasPerRack) {
+          toChooseRacks.add(rack);
+        }
+      }
       if (!skippedRacks.isEmpty()) {
         toChooseRacks.removeAll(skippedRacks);
         toChooseRacks.addAll(0, skippedRacks);
@@ -203,7 +197,7 @@ public final class SCMContainerPlacementRackScatter
    *                     allocator, whether the favored nodes will be used
    *                     depends on whether the nodes meets the allocator's
    *                     requirement.
-   * @param nodesRequiredToChoose - number of datanodes required.
+   * @param nodesRequired - number of datanodes required.
    * @param dataSizeRequired - size required for the container.
    * @param metadataSizeRequired - size required for Ratis metadata.
    * @return List of datanodes.
@@ -215,17 +209,16 @@ public final class SCMContainerPlacementRackScatter
           List<DatanodeDetails> usedNodes,
           final List<DatanodeDetails> excludedNodes,
           final List<DatanodeDetails> favoredNodes,
-          final int nodesRequiredToChoose, final long metadataSizeRequired,
+          final int nodesRequired, final long metadataSizeRequired,
           final long dataSizeRequired) throws SCMException {
-    if (nodesRequiredToChoose <= 0) {
+    if (nodesRequired <= 0) {
       String errorMsg = "num of nodes required to choose should bigger" +
-          "than 0, but the given num is " + nodesRequiredToChoose;
+          "than 0, but the given num is " + nodesRequired;
       throw new SCMException(errorMsg, null);
     }
     if (metrics != null) {
-      metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+      metrics.incrDatanodeRequestCount(nodesRequired);
     }
-    int nodesRequired = nodesRequiredToChoose;
     int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
     int usedNodesCount = usedNodes == null ? 0 : usedNodes.size();
     List<Node> availableNodes = networkTopology.getNodes(
@@ -264,11 +257,13 @@ public final class SCMContainerPlacementRackScatter
       usedNodes = Collections.emptyList();
     }
     List<Node> racks = getAllRacks();
-    Map<Node, Integer> usedRacksCntMap = usedNodes.stream()
-            .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
-            .filter(node -> node != null)
-            .collect(Collectors.toMap(Function.identity(), e -> 1,
-                    Math::addExact));
+    Map<Node, Integer> usedRacksCntMap = new HashMap<>();
+    for (Node node : usedNodes) {
+      Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
+      if (rack != null) {
+        usedRacksCntMap.merge(rack, 1, Math::addExact);
+      }
+    }
     int requiredReplicationFactor = usedNodes.size() + nodesRequired;
     int numberOfRacksRequired = 
getRequiredRackCount(requiredReplicationFactor);
     int additionalRacksRequired =
@@ -290,13 +285,10 @@ public final class SCMContainerPlacementRackScatter
     // For excluded nodes, we sort their racks at rear
     racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
 
-    List<Node> unavailableNodes = new ArrayList<>();
+    List<Node> unavailableNodes = new ArrayList<>(usedNodes);
     if (excludedNodes != null) {
       unavailableNodes.addAll(excludedNodes);
     }
-    unavailableNodes.addAll(usedNodes);
-
-    Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
 
     if (racks.size() < additionalRacksRequired) {
       String reason = "Number of existing racks: " + racks.size()
@@ -311,10 +303,11 @@ public final class SCMContainerPlacementRackScatter
               FAILED_TO_FIND_SUITABLE_NODE);
     }
 
-    chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
+    Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>(
+        chooseNodesFromRacks(racks, unavailableNodes,
             mutableFavoredNodes, additionalRacksRequired,
-            Pair.of(metadataSizeRequired, dataSizeRequired), 1,
-            Pair.of(usedRacksCntMap, maxReplicasPerRack)));
+            metadataSizeRequired, dataSizeRequired, 1,
+            usedRacksCntMap, maxReplicasPerRack));
 
     if (chosenNodes.size() < additionalRacksRequired) {
       String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
@@ -334,14 +327,14 @@ public final class SCMContainerPlacementRackScatter
       racks.addAll(usedRacksCntMap.keySet());
       chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
               mutableFavoredNodes, nodesRequired - chosenNodes.size(),
-              Pair.of(metadataSizeRequired, dataSizeRequired),
-              Integer.MAX_VALUE, Pair.of(usedRacksCntMap, 
maxReplicasPerRack)));
+              metadataSizeRequired, dataSizeRequired,
+              Integer.MAX_VALUE, usedRacksCntMap, maxReplicasPerRack));
     }
     List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
-    if (nodesRequiredToChoose != chosenNodes.size()) {
+    if (nodesRequired != chosenNodes.size()) {
       String reason = "Chosen nodes size: " + chosenNodes
               .size() + ", but required nodes to choose: "
-              + nodesRequiredToChoose + " do not match.";
+              + nodesRequired + " do not match.";
       LOG.warn("Placement policy could not choose the enough nodes."
                + " {} Available nodes count: {}, Excluded nodes count: {}, "
                + " Used nodes count: {}",
@@ -349,15 +342,15 @@ public final class SCMContainerPlacementRackScatter
       throw new SCMException(reason,
               SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
     }
+    List<DatanodeDetails> newPlacement =
+        new ArrayList<>(usedNodes.size() + result.size());
+    newPlacement.addAll(usedNodes);
+    newPlacement.addAll(chosenNodes);
     ContainerPlacementStatus placementStatus =
-        validateContainerPlacement(Stream.of(usedNodes, result)
-                .flatMap(List::stream).collect(Collectors.toList()),
-                requiredReplicationFactor);
+        validateContainerPlacement(newPlacement, requiredReplicationFactor);
     if (!placementStatus.isPolicySatisfied()) {
       ContainerPlacementStatus initialPlacementStatus =
-              validateContainerPlacement(Stream.of(usedNodes).flatMap(
-                      List::stream).collect(Collectors.toList()),
-                      requiredReplicationFactor);
+          validateContainerPlacement(usedNodes, requiredReplicationFactor);
       if (initialPlacementStatus.misReplicationCount()
               < placementStatus.misReplicationCount()) {
         String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
@@ -470,13 +463,15 @@ public final class SCMContainerPlacementRackScatter
             && usedRacks.isEmpty()) {
       return racks;
     }
-    Set<Node> lessPreferredRacks = excludedNodes.stream()
-        .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
-        // Dead Nodes have been removed from the topology and so have a
-        // null rack. We need to exclude those from the rack list.
-        .filter(Objects::nonNull)
-        .filter(node -> !usedRacks.containsKey(node))
-        .collect(Collectors.toSet());
+    Set<Node> lessPreferredRacks = new HashSet<>();
+    for (Node node : excludedNodes) {
+      Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
+      // Dead Nodes have been removed from the topology and so have a
+      // null rack. We need to exclude those from the rack list.
+      if (rack != null && !usedRacks.containsKey(rack)) {
+        lessPreferredRacks.add(rack);
+      }
+    }
     List <Node> result = new ArrayList<>();
     for (Node rack : racks) {
       if (!usedRacks.containsKey(rack) && !lessPreferredRacks.contains(rack)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to