This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1356b93aa6 HDDS-8936. Eliminate unnecessary streams in
SCMContainerPlacementRackScatter (#5002)
1356b93aa6 is described below
commit 1356b93aa6c73f92f049353bb1d53b9438f34e4b
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jun 29 13:29:12 2023 +0200
HDDS-8936. Eliminate unnecessary streams in
SCMContainerPlacementRackScatter (#5002)
---
.../org/apache/hadoop/hdds/scm/net/NetUtils.java | 10 +-
.../hadoop/hdds/scm/net/NetworkTopologyImpl.java | 22 ++---
.../SCMContainerPlacementRackScatter.java | 103 ++++++++++-----------
3 files changed, 62 insertions(+), 73 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
index b13b006e77..7463c52e95 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
@@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/**
* Utility class to facilitate network topology functions.
@@ -104,18 +103,15 @@ public final class NetUtils {
continue;
}
// excludedScope is child of ancestor
- List<String> duplicateList = mutableExcludedScopes.stream()
- .filter(ancestor::isAncestor)
- .collect(Collectors.toList());
- mutableExcludedScopes.removeAll(duplicateList);
+ mutableExcludedScopes.removeIf(ancestor::isAncestor);
// ancestor is covered by excludedScope
- mutableExcludedScopes.stream().forEach(scope -> {
+ for (String scope : mutableExcludedScopes) {
if (ancestor.isDescendant(scope)) {
// remove exclude node if it's covered by excludedScope
iterator.remove();
}
- });
+ }
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 784ff24aad..ef8bacddad 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -27,12 +27,12 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
@@ -576,11 +576,10 @@ public class NetworkTopologyImpl implements
NetworkTopology {
if (LOG.isDebugEnabled()) {
LOG.debug("Start choosing node[scope = {}, index = {}, excludedScopes = "
+ "{}, excludedNodes = {}, affinityNode = {}, ancestorGen = {}",
- scope, leafIndex, (excludedScopes == null ? "" :
- excludedScopes.stream().collect(Collectors.joining(", "))),
- (excludedNodes == null ? "" : excludedNodes.stream()
- .map(Object::toString).collect(Collectors.joining(", "))),
- affinityNode == null ? "" : affinityNode.toString(), ancestorGen);
+ scope, leafIndex,
+ excludedScopes == null ? "" : excludedScopes,
+ excludedNodes == null ? "" : excludedNodes,
+ affinityNode == null ? "" : affinityNode, ancestorGen);
}
String finalScope = scope;
@@ -617,8 +616,9 @@ public class NetworkTopologyImpl implements NetworkTopology
{
}
// excludeScope and finalScope share nothing case
if (scopeNode.isAncestor(s)) {
- if (mutableExcludedScopes.stream().
- noneMatch(n -> getNode(s).isDescendant(n))) {
+ Node node = getNode(s);
+ if (node != null &&
+ mutableExcludedScopes.stream().noneMatch(node::isDescendant)) {
mutableExcludedScopes.add(s);
}
}
@@ -626,7 +626,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
}
// clone excludedNodes before remove duplicate in it
- Collection<Node> mutableExNodes = new ArrayList<>();
+ Collection<Node> mutableExNodes = new LinkedHashSet<>();
// add affinity node to mutableExNodes
if (affinityNode != null) {
@@ -636,8 +636,6 @@ public class NetworkTopologyImpl implements NetworkTopology
{
// Remove duplicate in excludedNodes
if (excludedNodes != null) {
mutableExNodes.addAll(excludedNodes);
- mutableExNodes =
- mutableExNodes.stream().distinct().collect(Collectors.toList());
}
// remove duplicate in mutableExNodes and mutableExcludedScopes
@@ -888,7 +886,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
private void checkExcludedScopes(List<String> excludedScopes) {
if (!CollectionUtils.isEmpty(excludedScopes)) {
- excludedScopes.stream().forEach(scope -> {
+ excludedScopes.forEach(scope -> {
if (scope.startsWith(SCOPE_REVERSE_STR)) {
throw new IllegalArgumentException("excludedScope " + scope +
" cannot start with " + SCOPE_REVERSE_STR);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index 2d5ade3993..d829c4f671 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
@@ -33,16 +32,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
@@ -97,20 +93,16 @@ public final class SCMContainerPlacementRackScatter
this.metrics = null;
}
- public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
- List<Node> unavailableNodes,
- List<DatanodeDetails> mutableFavoredNodes,
- int nodesRequired, final Pair<Long, Long> metadatasizeDatasizePair,
- int maxOuterLoopIterations, final Pair<Map<Node, Integer>, Integer>
- rackCntMapMaxReplicaPerRackPair) {
+ @SuppressWarnings("checkstyle:parameternumber")
+ private Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
+ List<Node> unavailableNodes,
+ List<DatanodeDetails> mutableFavoredNodes,
+ int nodesRequired, long metadataSizeRequired, long dataSizeRequired,
+ int maxOuterLoopIterations, Map<Node, Integer> rackCntMap,
+ int maxReplicasPerRack) {
if (nodesRequired <= 0) {
return Collections.emptySet();
}
- final long metadataSizeRequired = metadatasizeDatasizePair.getKey();
- final long dataSizeRequired = metadatasizeDatasizePair.getValue();
- final Map<Node, Integer> rackCntMap =
- rackCntMapMaxReplicaPerRackPair.getKey();
- final int maxReplicasPerRack = rackCntMapMaxReplicaPerRackPair.getValue();
List<Node> toChooseRacks = new LinkedList<>();
Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
Set<Node> skippedRacks = new HashSet<>();
@@ -124,9 +116,11 @@ public final class SCMContainerPlacementRackScatter
// Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
// for an even distribution
- toChooseRacks.addAll(racks.stream()
- .filter(rack -> rackCntMap.getOrDefault(rack, 0)
- < maxReplicasPerRack).collect(Collectors.toList()));
+ for (Node rack : racks) {
+ if (rackCntMap.getOrDefault(rack, 0) < maxReplicasPerRack) {
+ toChooseRacks.add(rack);
+ }
+ }
if (!skippedRacks.isEmpty()) {
toChooseRacks.removeAll(skippedRacks);
toChooseRacks.addAll(0, skippedRacks);
@@ -203,7 +197,7 @@ public final class SCMContainerPlacementRackScatter
* allocator, whether the favored nodes will be used
* depends on whether the nodes meets the allocator's
* requirement.
- * @param nodesRequiredToChoose - number of datanodes required.
+ * @param nodesRequired - number of datanodes required.
* @param dataSizeRequired - size required for the container.
* @param metadataSizeRequired - size required for Ratis metadata.
* @return List of datanodes.
@@ -215,17 +209,16 @@ public final class SCMContainerPlacementRackScatter
List<DatanodeDetails> usedNodes,
final List<DatanodeDetails> excludedNodes,
final List<DatanodeDetails> favoredNodes,
- final int nodesRequiredToChoose, final long metadataSizeRequired,
+ final int nodesRequired, final long metadataSizeRequired,
final long dataSizeRequired) throws SCMException {
- if (nodesRequiredToChoose <= 0) {
+ if (nodesRequired <= 0) {
String errorMsg = "num of nodes required to choose should bigger" +
- "than 0, but the given num is " + nodesRequiredToChoose;
+ "than 0, but the given num is " + nodesRequired;
throw new SCMException(errorMsg, null);
}
if (metrics != null) {
- metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+ metrics.incrDatanodeRequestCount(nodesRequired);
}
- int nodesRequired = nodesRequiredToChoose;
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
int usedNodesCount = usedNodes == null ? 0 : usedNodes.size();
List<Node> availableNodes = networkTopology.getNodes(
@@ -264,11 +257,13 @@ public final class SCMContainerPlacementRackScatter
usedNodes = Collections.emptyList();
}
List<Node> racks = getAllRacks();
- Map<Node, Integer> usedRacksCntMap = usedNodes.stream()
- .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
- .filter(node -> node != null)
- .collect(Collectors.toMap(Function.identity(), e -> 1,
- Math::addExact));
+ Map<Node, Integer> usedRacksCntMap = new HashMap<>();
+ for (Node node : usedNodes) {
+ Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
+ if (rack != null) {
+ usedRacksCntMap.merge(rack, 1, Math::addExact);
+ }
+ }
int requiredReplicationFactor = usedNodes.size() + nodesRequired;
int numberOfRacksRequired =
getRequiredRackCount(requiredReplicationFactor);
int additionalRacksRequired =
@@ -290,13 +285,10 @@ public final class SCMContainerPlacementRackScatter
// For excluded nodes, we sort their racks at rear
racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
- List<Node> unavailableNodes = new ArrayList<>();
+ List<Node> unavailableNodes = new ArrayList<>(usedNodes);
if (excludedNodes != null) {
unavailableNodes.addAll(excludedNodes);
}
- unavailableNodes.addAll(usedNodes);
-
- Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
if (racks.size() < additionalRacksRequired) {
String reason = "Number of existing racks: " + racks.size()
@@ -311,10 +303,11 @@ public final class SCMContainerPlacementRackScatter
FAILED_TO_FIND_SUITABLE_NODE);
}
- chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
+ Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>(
+ chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, additionalRacksRequired,
- Pair.of(metadataSizeRequired, dataSizeRequired), 1,
- Pair.of(usedRacksCntMap, maxReplicasPerRack)));
+ metadataSizeRequired, dataSizeRequired, 1,
+ usedRacksCntMap, maxReplicasPerRack));
if (chosenNodes.size() < additionalRacksRequired) {
String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
@@ -334,14 +327,14 @@ public final class SCMContainerPlacementRackScatter
racks.addAll(usedRacksCntMap.keySet());
chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, nodesRequired - chosenNodes.size(),
- Pair.of(metadataSizeRequired, dataSizeRequired),
- Integer.MAX_VALUE, Pair.of(usedRacksCntMap,
maxReplicasPerRack)));
+ metadataSizeRequired, dataSizeRequired,
+ Integer.MAX_VALUE, usedRacksCntMap, maxReplicasPerRack));
}
List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
- if (nodesRequiredToChoose != chosenNodes.size()) {
+ if (nodesRequired != chosenNodes.size()) {
String reason = "Chosen nodes size: " + chosenNodes
.size() + ", but required nodes to choose: "
- + nodesRequiredToChoose + " do not match.";
+ + nodesRequired + " do not match.";
LOG.warn("Placement policy could not choose the enough nodes."
+ " {} Available nodes count: {}, Excluded nodes count: {}, "
+ " Used nodes count: {}",
@@ -349,15 +342,15 @@ public final class SCMContainerPlacementRackScatter
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
+ List<DatanodeDetails> newPlacement =
+ new ArrayList<>(usedNodes.size() + result.size());
+ newPlacement.addAll(usedNodes);
+ newPlacement.addAll(chosenNodes);
ContainerPlacementStatus placementStatus =
- validateContainerPlacement(Stream.of(usedNodes, result)
- .flatMap(List::stream).collect(Collectors.toList()),
- requiredReplicationFactor);
+ validateContainerPlacement(newPlacement, requiredReplicationFactor);
if (!placementStatus.isPolicySatisfied()) {
ContainerPlacementStatus initialPlacementStatus =
- validateContainerPlacement(Stream.of(usedNodes).flatMap(
- List::stream).collect(Collectors.toList()),
- requiredReplicationFactor);
+ validateContainerPlacement(usedNodes, requiredReplicationFactor);
if (initialPlacementStatus.misReplicationCount()
< placementStatus.misReplicationCount()) {
String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
@@ -470,13 +463,15 @@ public final class SCMContainerPlacementRackScatter
&& usedRacks.isEmpty()) {
return racks;
}
- Set<Node> lessPreferredRacks = excludedNodes.stream()
- .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
- // Dead Nodes have been removed from the topology and so have a
- // null rack. We need to exclude those from the rack list.
- .filter(Objects::nonNull)
- .filter(node -> !usedRacks.containsKey(node))
- .collect(Collectors.toSet());
+ Set<Node> lessPreferredRacks = new HashSet<>();
+ for (Node node : excludedNodes) {
+ Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
+ // Dead Nodes have been removed from the topology and so have a
+ // null rack. We need to exclude those from the rack list.
+ if (rack != null && !usedRacks.containsKey(rack)) {
+ lessPreferredRacks.add(rack);
+ }
+ }
List <Node> result = new ArrayList<>();
for (Node rack : racks) {
if (!usedRacks.containsKey(rack) && !lessPreferredRacks.contains(rack)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]