This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 823b505 HDDS-5326. EC: Create a new as many racks as possible
placement policy for EC (#2497)
823b505 is described below
commit 823b505ab51bd37c88dac7fc060e0996967b0135
Author: Symious <[email protected]>
AuthorDate: Fri Aug 27 01:10:58 2021 +0800
HDDS-5326. EC: Create a new as many racks as possible placement policy for
EC (#2497)
---
.../common/src/main/resources/ozone-default.xml | 2 +-
.../ContainerPlacementPolicyFactory.java | 5 +-
.../SCMContainerPlacementRackScatter.java | 321 +++++++++++++
.../algorithms/TestContainerPlacementFactory.java | 3 +-
.../TestSCMContainerPlacementRackScatter.java | 504 +++++++++++++++++++++
5 files changed, 832 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c26655f..7fbac28 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -823,7 +823,7 @@
<property>
<name>ozone.scm.container.placement.ec.impl</name>
<value>
-
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom
+
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter
</value>
<tag>OZONE, MANAGEMENT</tag>
<description>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index 63367e6..9357084 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -40,6 +40,9 @@ public final class ContainerPlacementPolicyFactory {
private static final Class<? extends PlacementPolicy>
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
SCMContainerPlacementRandom.class;
+ private static final Class<? extends PlacementPolicy>
+ OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_DEFAULT =
+ SCMContainerPlacementRackScatter.class;
private ContainerPlacementPolicyFactory() {
}
@@ -64,7 +67,7 @@ public final class ContainerPlacementPolicyFactory {
// TODO: Change default placement policy for EC
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY,
- OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
+ OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_DEFAULT,
PlacementPolicy.class);
return getPolicyInternal(placementClass, conf, nodeManager, clusterMap,
fallback, metrics);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
new file mode 100644
index 0000000..f3b2c0c
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Container placement policy that scatter datanodes on different racks
+ * , together with the space to satisfy the size constraints.
+ * <p>
+ * This placement policy will try to distribute datanodes on as many racks as
+ * possible.
+ * <p>
+ * This implementation applies to network topology like "/rack/node". Don't
+ * recommend to use this if the network topology has more layers.
+ * <p>
+ */
+public final class SCMContainerPlacementRackScatter
+ extends SCMCommonPlacementPolicy {
+ @VisibleForTesting
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMContainerPlacementRackScatter.class);
+ private final NetworkTopology networkTopology;
+ private static final int RACK_LEVEL = 1;
+ // OUTER_LOOP is to avoid infinity choose on all racks
+ private static final int OUTER_LOOP_MAX_RETRY= 3;
+ // INNER_LOOP is to choose node in each rack
+ private static final int INNER_LOOP_MAX_RETRY= 5;
+ private final SCMContainerPlacementMetrics metrics;
+
+ /**
+ * Constructs a Container Placement with rack awareness.
+ *
+ * @param nodeManager Node Manager
+ * @param conf Configuration
+ */
+ public SCMContainerPlacementRackScatter(final NodeManager nodeManager,
+ final ConfigurationSource conf, final NetworkTopology networkTopology,
+ boolean fallback, final SCMContainerPlacementMetrics metrics) {
+ super(nodeManager, conf);
+ this.networkTopology = networkTopology;
+ this.metrics = metrics;
+ }
+
+ /**
+ * Called by SCM to choose datanodes.
+ *
+ *
+ * @param excludedNodes - list of the datanodes to exclude.
+ * @param favoredNodes - list of nodes preferred. This is a hint to the
+ * allocator, whether the favored nodes will be used
+ * depends on whether the nodes meets the allocator's
+ * requirement.
+ * @param nodesRequired - number of datanodes required.
+ * @param dataSizeRequired - size required for the container.
+ * @param metadataSizeRequired - size required for Ratis metadata.
+ * @return List of datanodes.
+ * @throws SCMException SCMException
+ */
+ @Override
+ public List<DatanodeDetails> chooseDatanodes(
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+ int nodesRequired, long metadataSizeRequired, long dataSizeRequired)
+ throws SCMException {
+ Preconditions.checkArgument(nodesRequired > 0);
+ metrics.incrDatanodeRequestCount(nodesRequired);
+ int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
+ int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
+ if (datanodeCount < nodesRequired + excludedNodesCount) {
+ throw new SCMException("No enough datanodes to choose. " +
+ "TotalNode = " + datanodeCount +
+ " RequiredNode = " + nodesRequired +
+ " ExcludedNode = " + excludedNodesCount, null);
+ }
+
+ List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
+ if (favoredNodes != null) {
+ // Generate mutableFavoredNodes, only stores valid favoredNodes
+ for (DatanodeDetails datanodeDetails : favoredNodes) {
+ if (isValidNode(datanodeDetails, metadataSizeRequired,
+ dataSizeRequired)) {
+ mutableFavoredNodes.add(datanodeDetails);
+ }
+ }
+ Collections.shuffle(mutableFavoredNodes);
+ }
+ if (excludedNodes != null) {
+ mutableFavoredNodes.removeAll(excludedNodes);
+ }
+
+ // For excluded nodes, we sort their racks at rear
+ List<Node> racks = getAllRacks();
+ if (excludedNodes != null && excludedNodes.size() > 0) {
+ racks = sortRackWithExcludedNodes(racks, excludedNodes);
+ }
+
+ List<Node> toChooseRacks = new LinkedList<>(racks);
+ List<DatanodeDetails> chosenNodes = new ArrayList<>();
+ List<Node> unavailableNodes = new ArrayList<>();
+ Set<Node> skippedRacks = new HashSet<>();
+ if (excludedNodes != null) {
+ unavailableNodes.addAll(excludedNodes);
+ }
+
+ // If the result doesn't change after retryCount, we return with exception
+ int retryCount = 0;
+ while (nodesRequired > 0) {
+ if (retryCount > OUTER_LOOP_MAX_RETRY) {
+ throw new SCMException("No satisfied datanode to meet the" +
+ " excludedNodes and affinityNode constrains.", null);
+ }
+ int chosenListSize = chosenNodes.size();
+
+ // Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
+ // for a even distribution
+ toChooseRacks.addAll(racks);
+ if (!skippedRacks.isEmpty()) {
+ toChooseRacks.removeAll(skippedRacks);
+ toChooseRacks.addAll(0, skippedRacks);
+ skippedRacks.clear();
+ }
+
+ if (mutableFavoredNodes.size() > 0) {
+ List<Node> chosenFavoredNodesInForLoop = new ArrayList<>();
+ for (DatanodeDetails favoredNode : mutableFavoredNodes) {
+ Node curRack = getRackOfDatanodeDetails(favoredNode);
+ if (toChooseRacks.contains(curRack)) {
+ chosenNodes.add(favoredNode);
+ toChooseRacks.remove(curRack);
+ chosenFavoredNodesInForLoop.add(favoredNode);
+ unavailableNodes.add(favoredNode);
+ nodesRequired--;
+ if (nodesRequired == 0) {
+ break;
+ }
+ }
+ }
+ mutableFavoredNodes.removeAll(chosenFavoredNodesInForLoop);
+ }
+
+ // If satisfied by favored nodes, return then.
+ if (nodesRequired == 0) {
+ break;
+ }
+
+ for (Node rack : toChooseRacks) {
+ Node node = chooseNode(rack.getNetworkFullPath(), unavailableNodes,
+ metadataSizeRequired, dataSizeRequired);
+ if (node != null) {
+ chosenNodes.add((DatanodeDetails) node);
+ mutableFavoredNodes.remove(node);
+ unavailableNodes.add(node);
+ nodesRequired--;
+ if (nodesRequired == 0) {
+ break;
+ }
+ } else {
+ // Store the skipped racks to check them first in next outer loop
+ skippedRacks.add(rack);
+ }
+ }
+ // Clear toChooseRacks for this loop
+ toChooseRacks.clear();
+
+ // If chosenNodes not changed, increase the retryCount
+ if (chosenListSize == chosenNodes.size()) {
+ retryCount++;
+ } else {
+ // If chosenNodes changed, reset the retryCount
+ retryCount = 0;
+ }
+ }
+ ContainerPlacementStatus placementStatus =
+ validateContainerPlacement(chosenNodes, nodesRequired);
+ if (!placementStatus.isPolicySatisfied()) {
+ LOG.warn("ContainerPlacementPolicy not met, currentRacks is {}," +
+ " desired racks is {}.", placementStatus.actualPlacementCount(),
+ placementStatus.expectedPlacementCount());
+ }
+ return chosenNodes;
+ }
+
+ @Override
+ public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+ return null;
+ }
+
+ /**
+ * Choose a datanode which meets the requirements. If there is no node which
+ * meets all the requirements, there is fallback chosen process depending on
+ * whether fallback is allowed when this class is instantiated.
+ *
+ *
+ * @param scope - the rack we are searching nodes under
+ * @param excludedNodes - list of the datanodes to excluded. Can be null.
+ * @param dataSizeRequired - size required for the container.
+ * @param metadataSizeRequired - size required for Ratis metadata.
+ * @return the chosen datanode.
+ */
+ private Node chooseNode(String scope, List<Node> excludedNodes,
+ long metadataSizeRequired, long dataSizeRequired) {
+ int maxRetry = INNER_LOOP_MAX_RETRY;
+ while(true) {
+ metrics.incrDatanodeChooseAttemptCount();
+ Node node = networkTopology.chooseRandom(scope, excludedNodes);
+ if (node == null) {
+ // cannot find the node which meets all constrains
+ LOG.warn("Failed to find the datanode for container. excludedNodes:" +
+ (excludedNodes == null ? "" : excludedNodes.toString()) +
+ ", rack:" + scope);
+ return null;
+ }
+
+ DatanodeDetails datanodeDetails = (DatanodeDetails)node;
+ if (isValidNode(datanodeDetails, metadataSizeRequired,
+ dataSizeRequired)) {
+ metrics.incrDatanodeChooseSuccessCount();
+ return node;
+ }
+ // exclude the unavailable node for the following retries.
+ excludedNodes.add(node);
+
+ maxRetry--;
+ if (maxRetry == 0) {
+ // avoid the infinite loop
+ String errMsg = "No satisfied datanode to meet the space constrains. "
+ + "metadatadata size required: " + metadataSizeRequired +
+ " data size required: " + dataSizeRequired;
+ LOG.info(errMsg);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * For EC placement policy, desired rack count would be equal to the num of
+ * Replicas.
+ * @param numReplicas - num of Replicas.
+ * @return required rack count.
+ */
+ @Override
+ protected int getRequiredRackCount(int numReplicas) {
+ if (networkTopology == null) {
+ return 1;
+ }
+ int maxLevel = networkTopology.getMaxLevel();
+ int numRacks = networkTopology.getNumOfNodes(maxLevel - 1);
+ // Return the num of Rack if numRack less than numReplicas
+ return Math.min(numRacks, numReplicas);
+ }
+
+ private Node getRackOfDatanodeDetails(DatanodeDetails datanodeDetails) {
+ String location = datanodeDetails.getNetworkLocation();
+ return networkTopology.getNode(location);
+ }
+
+ /**
+ * For the rack holding excludedNodes, we prefer not to choose from these
+ * racks, so we sort these racks at rear.
+ * @param racks
+ * @param excludedNodes
+ * @return
+ */
+ private List<Node> sortRackWithExcludedNodes(List<Node> racks,
+ List<DatanodeDetails> excludedNodes) {
+ Set<Node> lessPreferredRacks = excludedNodes.stream()
+ .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
+ .collect(Collectors.toSet());
+ List <Node> result = new ArrayList<>();
+ for (Node rack : racks) {
+ if (!lessPreferredRacks.contains(rack)) {
+ result.add(rack);
+ }
+ }
+ result.addAll(lessPreferredRacks);
+ return result;
+ }
+
+ private List<Node> getAllRacks() {
+ int rackLevel = networkTopology.getMaxLevel()-1;
+ List<Node> racks = networkTopology.getNodes(rackLevel);
+ Collections.shuffle(racks);
+ return racks;
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index b6499a6..4efc559 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -169,7 +169,8 @@ public class TestContainerPlacementFactory {
public void testECPolicy() throws IOException {
PlacementPolicy policy = ContainerPlacementPolicyFactory
.getECPolicy(conf, null, null, true, null);
- Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
+ Assert.assertSame(SCMContainerPlacementRackScatter.class,
+ policy.getClass());
}
/**
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
new file mode 100644
index 0000000..5a31b6f
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the scm container rack aware placement.
+ */
+@RunWith(Parameterized.class)
+public class TestSCMContainerPlacementRackScatter {
+ private NetworkTopology cluster;
+ private OzoneConfiguration conf;
+ private NodeManager nodeManager;
+ private final Integer datanodeCount;
+ private final List<DatanodeDetails> datanodes = new ArrayList<>();
+ private final List<DatanodeInfo> dnInfos = new ArrayList<>();
+ // policy with fallback capability
+ private SCMContainerPlacementRackScatter policy;
+ // node storage capacity
+ private static final long STORAGE_CAPACITY = 100L;
+ private SCMContainerPlacementMetrics metrics;
+ private static final int NODE_PER_RACK = 5;
+
+ public TestSCMContainerPlacementRackScatter(Integer count) {
+ this.datanodeCount = count;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> setupDatanodes() {
+ return Arrays.asList(new Object[][] {{3}, {4}, {5}, {6}, {7}, {8}, {9},
+ {10}, {11}, {12}, {13}, {14}, {15}, {20}, {25}, {30}});
+ }
+
+ @Before
+ public void setup() {
+ //initialize network topology instance
+ conf = new OzoneConfiguration();
+ // We are using small units here
+ conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+ 1, StorageUnit.BYTES);
+ NodeSchema[] schemas = new NodeSchema[]
+ {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+ NodeSchemaManager.getInstance().init(schemas, true);
+ cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+
+ // build datanodes, and network topology
+ String rack = "/rack";
+ String hostname = "node";
+ for (int i = 0; i < datanodeCount; i++) {
+ // Totally 6 racks, each has 5 datanodes
+ DatanodeDetails datanodeDetails =
+ MockDatanodeDetails.createDatanodeDetails(
+ hostname + i, rack + (i / NODE_PER_RACK));
+ DatanodeInfo datanodeInfo = new DatanodeInfo(
+ datanodeDetails, NodeStatus.inServiceHealthy(),
+ UpgradeUtils.defaultLayoutVersionProto());
+
+ StorageReportProto storage1 = TestUtils.createStorageReport(
+ datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ MetadataStorageReportProto metaStorage1 =
+ TestUtils.createMetadataStorageReport(
+ "/metadata1-" + datanodeInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ datanodeInfo.updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage1)));
+ datanodeInfo.updateMetaDataStorageReports(
+ new ArrayList<>(Arrays.asList(metaStorage1)));
+
+ datanodes.add(datanodeDetails);
+ cluster.add(datanodeDetails);
+ dnInfos.add(datanodeInfo);
+ }
+
+ if (datanodeCount > 4) {
+ StorageReportProto storage2 = TestUtils.createStorageReport(
+ dnInfos.get(2).getUuid(),
+ "/data1-" + datanodes.get(2).getUuidString(),
+ STORAGE_CAPACITY, 90L, 10L, null);
+ dnInfos.get(2).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage2)));
+ StorageReportProto storage3 = TestUtils.createStorageReport(
+ dnInfos.get(3).getUuid(),
+ "/data1-" + dnInfos.get(3).getUuidString(),
+ STORAGE_CAPACITY, 80L, 20L, null);
+ dnInfos.get(3).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage3)));
+ StorageReportProto storage4 = TestUtils.createStorageReport(
+ dnInfos.get(4).getUuid(),
+ "/data1-" + dnInfos.get(4).getUuidString(),
+ STORAGE_CAPACITY, 70L, 30L, null);
+ dnInfos.get(4).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage4)));
+ } else if (datanodeCount > 3) {
+ StorageReportProto storage2 = TestUtils.createStorageReport(
+ dnInfos.get(2).getUuid(),
+ "/data1-" + dnInfos.get(2).getUuidString(),
+ STORAGE_CAPACITY, 90L, 10L, null);
+ dnInfos.get(2).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage2)));
+ StorageReportProto storage3 = TestUtils.createStorageReport(
+ dnInfos.get(3).getUuid(),
+ "/data1-" + dnInfos.get(3).getUuidString(),
+ STORAGE_CAPACITY, 80L, 20L, null);
+ dnInfos.get(3).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage3)));
+ } else if (datanodeCount > 2) {
+ StorageReportProto storage2 = TestUtils.createStorageReport(
+ dnInfos.get(2).getUuid(),
+ "/data1-" + dnInfos.get(2).getUuidString(),
+ STORAGE_CAPACITY, 84L, 16L, null);
+ dnInfos.get(2).updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage2)));
+ }
+
+ // create mock node manager
+ nodeManager = Mockito.mock(NodeManager.class);
+ when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
+ .thenReturn(new ArrayList<>(datanodes));
+ for (DatanodeInfo dn: dnInfos) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
+ when(nodeManager.getClusterNetworkTopologyMap())
+ .thenReturn(cluster);
+
+ // create placement policy instances
+ metrics = SCMContainerPlacementMetrics.create();
+ policy = new SCMContainerPlacementRackScatter(
+ nodeManager, conf, cluster, true, metrics);
+ }
+
+ @After
+ public void teardown() {
+ metrics.unRegister();
+ }
+
+ @Test
+ public void chooseNodeWithNoExcludedNodes() throws SCMException {
+ int rackLevel = cluster.getMaxLevel() - 1;
+ int rackNum = cluster.getNumOfNodes(rackLevel);
+
+ // test choose new datanodes for new pipeline cases
+ // 1 replica
+ int nodeNum = 1;
+ List<DatanodeDetails> datanodeDetails =
+ policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+
+ // 2 replicas
+ nodeNum = 2;
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertTrue(!cluster.isSameParent(datanodeDetails.get(0),
+ datanodeDetails.get(1)) || (datanodeCount <= NODE_PER_RACK));
+
+ // 3 replicas
+ nodeNum = 3;
+ if (datanodeCount > nodeNum) {
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails),
+ Math.min(nodeNum, rackNum));
+ }
+
+ // 5 replicas
+ nodeNum = 5;
+ if (datanodeCount > nodeNum) {
+ assumeTrue(datanodeCount >= NODE_PER_RACK);
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails),
+ Math.min(nodeNum, rackNum));
+ }
+
+ // 10 replicas
+ nodeNum = 10;
+ if (datanodeCount > nodeNum) {
+ assumeTrue(datanodeCount > 2 * NODE_PER_RACK);
+ datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails),
+ Math.min(nodeNum, rackNum));
+ }
+ }
+
+ @Test
+ public void chooseNodeWithExcludedNodes() throws SCMException {
+ int rackLevel = cluster.getMaxLevel() - 1;
+ int rackNum = cluster.getNumOfNodes(rackLevel);
+ int totalNum;
+ // test choose new datanodes for under replicated pipeline
+ // 3 replicas, two existing datanodes on same rack
+ assumeTrue(datanodeCount > NODE_PER_RACK);
+ int nodeNum = 1;
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+
+ excludedNodes.add(datanodes.get(0));
+ excludedNodes.add(datanodes.get(1));
+ List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+ excludedNodes.get(0)));
+ Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+ excludedNodes.get(1)));
+
+ // 3 replicas, one existing datanode
+ nodeNum = 2;
+ totalNum = 3;
+ excludedNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+ Math.min(totalNum, rackNum));
+
+ // 3 replicas, two existing datanodes on different rack
+ nodeNum = 1;
+ totalNum = 3;
+ excludedNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ excludedNodes.add(datanodes.get(5));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+ Math.min(totalNum, rackNum));
+
+ // 5 replicas, one existing datanode
+ nodeNum = 4;
+ totalNum = 5;
+ excludedNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+ Math.min(totalNum, rackNum));
+
+ // 5 replicas, two existing datanodes on different rack
+ nodeNum = 3;
+ totalNum = 5;
+ excludedNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ excludedNodes.add(datanodes.get(5));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+ Math.min(totalNum, rackNum));
+ }
+
+ @Test
+ public void chooseNodeWithFavoredNodes() throws SCMException {
+ int nodeNum = 1;
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+ List<DatanodeDetails> favoredNodes = new ArrayList<>();
+
+ // no excludedNodes, only favoredNodes
+ favoredNodes.add(datanodes.get(0));
+ List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, favoredNodes, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(datanodeDetails.get(0).getNetworkFullPath(),
+ favoredNodes.get(0).getNetworkFullPath());
+
+ // no overlap between excludedNodes and favoredNodes, favoredNodes can been
+ // chosen.
+ excludedNodes.clear();
+ favoredNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ favoredNodes.add(datanodes.get(1));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, favoredNodes, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(datanodeDetails.get(0).getNetworkFullPath(),
+ favoredNodes.get(0).getNetworkFullPath());
+
+ // there is overlap between excludedNodes and favoredNodes, favoredNodes
+ // should not be chosen.
+ excludedNodes.clear();
+ favoredNodes.clear();
+ excludedNodes.add(datanodes.get(0));
+ favoredNodes.add(datanodes.get(0));
+ datanodeDetails = policy.chooseDatanodes(
+ excludedNodes, favoredNodes, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertFalse(datanodeDetails.get(0).getNetworkFullPath()
+ .equals(favoredNodes.get(0).getNetworkFullPath()));
+ }
+
+ @Test
+ public void testNoInfiniteLoop() throws SCMException {
+ int nodeNum = 1;
+
+ try {
+ // request storage space larger than node capability
+ policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 0, 15);
+ fail("Storage requested exceeds capacity, this call should fail");
+ } catch (Exception e) {
+ assertTrue(e.getClass().getSimpleName().equals("SCMException"));
+ }
+
+ // get metrics
+ long totalRequest = metrics.getDatanodeRequestCount();
+ long successCount = metrics.getDatanodeChooseSuccessCount();
+ long tryCount = metrics.getDatanodeChooseAttemptCount();
+ long compromiseCount = metrics.getDatanodeChooseFallbackCount();
+
+ Assert.assertEquals(totalRequest, nodeNum);
+ Assert.assertEquals(successCount, 0);
+ MatcherAssert.assertThat("Not enough try", tryCount,
+ greaterThanOrEqualTo((long) nodeNum));
+ Assert.assertEquals(compromiseCount, 0);
+ }
+
+ @Test
+ public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
+ String hostname = "node";
+ List<DatanodeInfo> dnInfoList = new ArrayList<>();
+ List<DatanodeDetails> dataList = new ArrayList<>();
+ NetworkTopology clusterMap =
+ new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+ for (int i = 0; i < 30; i++) {
+ // Totally 6 racks, each has 5 datanodes
+ DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails(
+ hostname + i, null);
+ DatanodeInfo dnInfo = new DatanodeInfo(
+ dn, NodeStatus.inServiceHealthy(),
+ UpgradeUtils.defaultLayoutVersionProto());
+
+ StorageReportProto storage1 = TestUtils.createStorageReport(
+ dnInfo.getUuid(), "/data1-" + dnInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ MetadataStorageReportProto metaStorage1 =
+ TestUtils.createMetadataStorageReport(
+ "/metadata1-" + dnInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ dnInfo.updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage1)));
+ dnInfo.updateMetaDataStorageReports(
+ new ArrayList<>(Arrays.asList(metaStorage1)));
+
+ dataList.add(dn);
+ clusterMap.add(dn);
+ dnInfoList.add(dnInfo);
+ }
+ Assert.assertEquals(dataList.size(), StringUtils.countMatches(
+ clusterMap.toString(), NetConstants.DEFAULT_RACK));
+ for (DatanodeInfo dn: dnInfoList) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
+
+ // choose nodes to host 5 replica
+ int nodeNum = 5;
+ SCMContainerPlacementRackScatter newPolicy =
+ new SCMContainerPlacementRackScatter(nodeManager, conf, clusterMap,
+ true, metrics);
+ List<DatanodeDetails> datanodeDetails =
+ newPolicy.chooseDatanodes(null, null, nodeNum, 0, 15);
+ Assert.assertEquals(nodeNum, datanodeDetails.size());
+ Assert.assertEquals(1, getRackSize(datanodeDetails));
+ }
+
+ @Test
+ public void testvalidateContainerPlacement() {
+ // Only run this test for the full set of DNs. 5 DNs per rack on 6 racks.
+ assumeTrue(datanodeCount >= 15);
+ List<DatanodeDetails> dns = new ArrayList<>();
+ // First 5 node are on the same rack
+ dns.add(datanodes.get(0));
+ dns.add(datanodes.get(1));
+ dns.add(datanodes.get(2));
+ ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3);
+ assertFalse(stat.isPolicySatisfied());
+ assertEquals(2, stat.misReplicationCount());
+
+ // Pick a new list which spans 2 racks
+ dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ dns.add(datanodes.get(1));
+ dns.add(datanodes.get(5)); // This is on second rack
+ stat = policy.validateContainerPlacement(dns, 3);
+ assertFalse(stat.isPolicySatisfied());
+ assertEquals(1, stat.misReplicationCount());
+
+ // Pick single DN, expecting 3 replica. Policy is not met.
+ dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ stat = policy.validateContainerPlacement(dns, 3);
+ assertFalse(stat.isPolicySatisfied());
+ assertEquals(2, stat.misReplicationCount());
+
+ // Pick single DN, expecting 1 replica. Policy is met.
+ dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ stat = policy.validateContainerPlacement(dns, 1);
+ assertTrue(stat.isPolicySatisfied());
+ assertEquals(0, stat.misReplicationCount());
+ }
+
+ @Test
+ public void testvalidateContainerPlacementSingleRackCluster() {
+ assumeTrue(datanodeCount == 5);
+
+ // All nodes are on the same rack in this test, and the cluster only has
+ // one rack.
+ List<DatanodeDetails> dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ dns.add(datanodes.get(1));
+ dns.add(datanodes.get(2));
+ ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3);
+ assertTrue(stat.isPolicySatisfied());
+ assertEquals(0, stat.misReplicationCount());
+
+ // Single DN - policy met as cluster only has one rack.
+ dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ stat = policy.validateContainerPlacement(dns, 3);
+ assertTrue(stat.isPolicySatisfied());
+ assertEquals(0, stat.misReplicationCount());
+
+ // Single DN - only 1 replica expected
+ dns = new ArrayList<>();
+ dns.add(datanodes.get(0));
+ stat = policy.validateContainerPlacement(dns, 1);
+ assertTrue(stat.isPolicySatisfied());
+ assertEquals(0, stat.misReplicationCount());
+ }
+
+ private int getRackSize(List<DatanodeDetails>... datanodeDetails) {
+ Set<Node> racks = new HashSet<>();
+ for (List<DatanodeDetails> list : datanodeDetails) {
+ for (DatanodeDetails dn : list) {
+ racks.add(cluster.getAncestor(dn, 1));
+ }
+ }
+ return racks.size();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]