This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 823b505  HDDS-5326. EC: Create a new as many racks as possible 
placement policy for EC (#2497)
823b505 is described below

commit 823b505ab51bd37c88dac7fc060e0996967b0135
Author: Symious <[email protected]>
AuthorDate: Fri Aug 27 01:10:58 2021 +0800

    HDDS-5326. EC: Create a new as many racks as possible placement policy for 
EC (#2497)
---
 .../common/src/main/resources/ozone-default.xml    |   2 +-
 .../ContainerPlacementPolicyFactory.java           |   5 +-
 .../SCMContainerPlacementRackScatter.java          | 321 +++++++++++++
 .../algorithms/TestContainerPlacementFactory.java  |   3 +-
 .../TestSCMContainerPlacementRackScatter.java      | 504 +++++++++++++++++++++
 5 files changed, 832 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c26655f..7fbac28 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -823,7 +823,7 @@
   <property>
     <name>ozone.scm.container.placement.ec.impl</name>
     <value>
-      
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom
+      
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter
     </value>
     <tag>OZONE, MANAGEMENT</tag>
     <description>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index 63367e6..9357084 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -40,6 +40,9 @@ public final class ContainerPlacementPolicyFactory {
   private static final Class<? extends PlacementPolicy>
       OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
       SCMContainerPlacementRandom.class;
+  private static final Class<? extends PlacementPolicy>
+      OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_DEFAULT =
+      SCMContainerPlacementRackScatter.class;
 
   private ContainerPlacementPolicyFactory() {
   }
@@ -64,7 +67,7 @@ public final class ContainerPlacementPolicyFactory {
     // TODO: Change default placement policy for EC
     final Class<? extends PlacementPolicy> placementClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY,
-            OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
+            OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_DEFAULT,
             PlacementPolicy.class);
     return getPolicyInternal(placementClass, conf, nodeManager, clusterMap,
         fallback, metrics);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
new file mode 100644
index 0000000..f3b2c0c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Container placement policy that scatter datanodes on different racks
+ * , together with the space to satisfy the size constraints.
+ * <p>
+ * This placement policy will try to distribute datanodes on as many racks as
+ * possible.
+ * <p>
+ * This implementation applies to network topology like "/rack/node". Don't
+ * recommend to use this if the network topology has more layers.
+ * <p>
+ */
+public final class SCMContainerPlacementRackScatter
+    extends SCMCommonPlacementPolicy {
+  @VisibleForTesting
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMContainerPlacementRackScatter.class);
+  private final NetworkTopology networkTopology;
+  private static final int RACK_LEVEL = 1;
+  // OUTER_LOOP is to avoid infinity choose on all racks
+  private static final int OUTER_LOOP_MAX_RETRY= 3;
+  // INNER_LOOP is to choose node in each rack
+  private static final int INNER_LOOP_MAX_RETRY= 5;
+  private final SCMContainerPlacementMetrics metrics;
+
+  /**
+   * Constructs a Container Placement with rack awareness.
+   *
+   * @param nodeManager Node Manager
+   * @param conf Configuration
+   */
+  public SCMContainerPlacementRackScatter(final NodeManager nodeManager,
+      final ConfigurationSource conf, final NetworkTopology networkTopology,
+      boolean fallback, final SCMContainerPlacementMetrics metrics) {
+    super(nodeManager, conf);
+    this.networkTopology = networkTopology;
+    this.metrics = metrics;
+  }
+
+  /**
+   * Called by SCM to choose datanodes.
+   *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
+   * @param favoredNodes - list of nodes preferred. This is a hint to the
+   *                     allocator, whether the favored nodes will be used
+   *                     depends on whether the nodes meets the allocator's
+   *                     requirement.
+   * @param nodesRequired - number of datanodes required.
+   * @param dataSizeRequired - size required for the container.
+   * @param metadataSizeRequired - size required for Ratis metadata.
+   * @return List of datanodes.
+   * @throws SCMException  SCMException
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      int nodesRequired, long metadataSizeRequired, long dataSizeRequired)
+      throws SCMException {
+    Preconditions.checkArgument(nodesRequired > 0);
+    metrics.incrDatanodeRequestCount(nodesRequired);
+    int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
+    int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
+    if (datanodeCount < nodesRequired + excludedNodesCount) {
+      throw new SCMException("No enough datanodes to choose. " +
+          "TotalNode = " + datanodeCount +
+          " RequiredNode = " + nodesRequired +
+          " ExcludedNode = " + excludedNodesCount, null);
+    }
+
+    List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
+    if (favoredNodes != null) {
+      // Generate mutableFavoredNodes, only stores valid favoredNodes
+      for (DatanodeDetails datanodeDetails : favoredNodes) {
+        if (isValidNode(datanodeDetails, metadataSizeRequired,
+            dataSizeRequired)) {
+          mutableFavoredNodes.add(datanodeDetails);
+        }
+      }
+      Collections.shuffle(mutableFavoredNodes);
+    }
+    if (excludedNodes != null) {
+      mutableFavoredNodes.removeAll(excludedNodes);
+    }
+
+    // For excluded nodes, we sort their racks at rear
+    List<Node> racks = getAllRacks();
+    if (excludedNodes != null && excludedNodes.size() > 0) {
+      racks = sortRackWithExcludedNodes(racks, excludedNodes);
+    }
+
+    List<Node> toChooseRacks = new LinkedList<>(racks);
+    List<DatanodeDetails> chosenNodes = new ArrayList<>();
+    List<Node> unavailableNodes = new ArrayList<>();
+    Set<Node> skippedRacks = new HashSet<>();
+    if (excludedNodes != null) {
+      unavailableNodes.addAll(excludedNodes);
+    }
+
+    // If the result doesn't change after retryCount, we return with exception
+    int retryCount = 0;
+    while (nodesRequired > 0) {
+      if (retryCount > OUTER_LOOP_MAX_RETRY) {
+        throw new SCMException("No satisfied datanode to meet the" +
+            " excludedNodes and affinityNode constrains.", null);
+      }
+      int chosenListSize = chosenNodes.size();
+
+      // Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
+      // for a even distribution
+      toChooseRacks.addAll(racks);
+      if (!skippedRacks.isEmpty()) {
+        toChooseRacks.removeAll(skippedRacks);
+        toChooseRacks.addAll(0, skippedRacks);
+        skippedRacks.clear();
+      }
+
+      if (mutableFavoredNodes.size() > 0) {
+        List<Node> chosenFavoredNodesInForLoop = new ArrayList<>();
+        for (DatanodeDetails favoredNode : mutableFavoredNodes) {
+          Node curRack = getRackOfDatanodeDetails(favoredNode);
+          if (toChooseRacks.contains(curRack)) {
+            chosenNodes.add(favoredNode);
+            toChooseRacks.remove(curRack);
+            chosenFavoredNodesInForLoop.add(favoredNode);
+            unavailableNodes.add(favoredNode);
+            nodesRequired--;
+            if (nodesRequired == 0) {
+              break;
+            }
+          }
+        }
+        mutableFavoredNodes.removeAll(chosenFavoredNodesInForLoop);
+      }
+
+      // If satisfied by favored nodes, return then.
+      if (nodesRequired == 0) {
+        break;
+      }
+
+      for (Node rack : toChooseRacks) {
+        Node node = chooseNode(rack.getNetworkFullPath(), unavailableNodes,
+            metadataSizeRequired, dataSizeRequired);
+        if (node != null) {
+          chosenNodes.add((DatanodeDetails) node);
+          mutableFavoredNodes.remove(node);
+          unavailableNodes.add(node);
+          nodesRequired--;
+          if (nodesRequired == 0) {
+            break;
+          }
+        } else {
+          // Store the skipped racks to check them first in next outer loop
+          skippedRacks.add(rack);
+        }
+      }
+      // Clear toChooseRacks for this loop
+      toChooseRacks.clear();
+
+      // If chosenNodes not changed, increase the retryCount
+      if (chosenListSize == chosenNodes.size()) {
+        retryCount++;
+      } else {
+        // If chosenNodes changed, reset the retryCount
+        retryCount = 0;
+      }
+    }
+    ContainerPlacementStatus placementStatus =
+        validateContainerPlacement(chosenNodes, nodesRequired);
+    if (!placementStatus.isPolicySatisfied()) {
+      LOG.warn("ContainerPlacementPolicy not met, currentRacks is {}," +
+          " desired racks is {}.", placementStatus.actualPlacementCount(),
+          placementStatus.expectedPlacementCount());
+    }
+    return chosenNodes;
+  }
+
+  @Override
+  public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+    return null;
+  }
+
+  /**
+   * Choose a datanode which meets the requirements. If there is no node which
+   * meets all the requirements, there is fallback chosen process depending on
+   * whether fallback is allowed when this class is instantiated.
+   *
+   *
+   * @param scope - the rack we are searching nodes under
+   * @param excludedNodes - list of the datanodes to excluded. Can be null.
+   * @param dataSizeRequired - size required for the container.
+   * @param metadataSizeRequired - size required for Ratis metadata.
+   * @return the chosen datanode.
+   */
+  private Node chooseNode(String scope, List<Node> excludedNodes,
+      long metadataSizeRequired, long dataSizeRequired) {
+    int maxRetry = INNER_LOOP_MAX_RETRY;
+    while(true) {
+      metrics.incrDatanodeChooseAttemptCount();
+      Node node = networkTopology.chooseRandom(scope, excludedNodes);
+      if (node == null) {
+        // cannot find the node which meets all constrains
+        LOG.warn("Failed to find the datanode for container. excludedNodes:" +
+            (excludedNodes == null ? "" : excludedNodes.toString()) +
+            ", rack:" + scope);
+        return null;
+      }
+
+      DatanodeDetails datanodeDetails = (DatanodeDetails)node;
+      if (isValidNode(datanodeDetails, metadataSizeRequired,
+          dataSizeRequired)) {
+        metrics.incrDatanodeChooseSuccessCount();
+        return node;
+      }
+      // exclude the unavailable node for the following retries.
+      excludedNodes.add(node);
+
+      maxRetry--;
+      if (maxRetry == 0) {
+        // avoid the infinite loop
+        String errMsg = "No satisfied datanode to meet the space constrains. "
+            + "metadatadata size required: " + metadataSizeRequired +
+            " data size required: " + dataSizeRequired;
+        LOG.info(errMsg);
+        return null;
+      }
+    }
+  }
+
+  /**
+   * For EC placement policy, desired rack count would be equal to the num of
+   * Replicas.
+   * @param numReplicas - num of Replicas.
+   * @return required rack count.
+   */
+  @Override
+  protected int getRequiredRackCount(int numReplicas) {
+    if (networkTopology == null) {
+      return 1;
+    }
+    int maxLevel = networkTopology.getMaxLevel();
+    int numRacks = networkTopology.getNumOfNodes(maxLevel - 1);
+    // Return the num of Rack if numRack less than numReplicas
+    return Math.min(numRacks, numReplicas);
+  }
+
+  private Node getRackOfDatanodeDetails(DatanodeDetails datanodeDetails) {
+    String location = datanodeDetails.getNetworkLocation();
+    return networkTopology.getNode(location);
+  }
+
+  /**
+   * For the rack holding excludedNodes, we prefer not to choose from these
+   * racks, so we sort these racks at rear.
+   * @param racks
+   * @param excludedNodes
+   * @return
+   */
+  private List<Node> sortRackWithExcludedNodes(List<Node> racks,
+      List<DatanodeDetails> excludedNodes) {
+    Set<Node> lessPreferredRacks = excludedNodes.stream()
+        .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
+        .collect(Collectors.toSet());
+    List <Node> result = new ArrayList<>();
+    for (Node rack : racks) {
+      if (!lessPreferredRacks.contains(rack)) {
+        result.add(rack);
+      }
+    }
+    result.addAll(lessPreferredRacks);
+    return result;
+  }
+
+  private List<Node> getAllRacks() {
+    int rackLevel = networkTopology.getMaxLevel()-1;
+    List<Node> racks = networkTopology.getNodes(rackLevel);
+    Collections.shuffle(racks);
+    return racks;
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index b6499a6..4efc559 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -169,7 +169,8 @@ public class TestContainerPlacementFactory {
   public void testECPolicy() throws IOException {
     PlacementPolicy policy = ContainerPlacementPolicyFactory
         .getECPolicy(conf, null, null, true, null);
-    Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
+    Assert.assertSame(SCMContainerPlacementRackScatter.class,
+        policy.getClass());
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
new file mode 100644
index 0000000..5a31b6f
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the scm container rack aware placement.
+ */
+@RunWith(Parameterized.class)
+public class TestSCMContainerPlacementRackScatter {
+  private NetworkTopology cluster;
+  private OzoneConfiguration conf;
+  private NodeManager nodeManager;
+  private final Integer datanodeCount;
+  private final List<DatanodeDetails> datanodes = new ArrayList<>();
+  private final List<DatanodeInfo> dnInfos = new ArrayList<>();
+  // policy with fallback capability
+  private SCMContainerPlacementRackScatter policy;
+  // node storage capacity
+  private static final long STORAGE_CAPACITY = 100L;
+  private SCMContainerPlacementMetrics metrics;
+  private static final int NODE_PER_RACK = 5;
+
+  public TestSCMContainerPlacementRackScatter(Integer count) {
+    this.datanodeCount = count;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> setupDatanodes() {
+    return Arrays.asList(new Object[][] {{3}, {4}, {5}, {6}, {7}, {8}, {9},
+        {10}, {11}, {12}, {13}, {14}, {15}, {20}, {25}, {30}});
+  }
+
+  @Before
+  public void setup() {
+    //initialize network topology instance
+    conf = new OzoneConfiguration();
+    // We are using small units here
+    conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+        1, StorageUnit.BYTES);
+    NodeSchema[] schemas = new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+    NodeSchemaManager.getInstance().init(schemas, true);
+    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+
+    // build datanodes, and network topology
+    String rack = "/rack";
+    String hostname = "node";
+    for (int i = 0; i < datanodeCount; i++) {
+      // Totally 6 racks, each has 5 datanodes
+      DatanodeDetails datanodeDetails =
+          MockDatanodeDetails.createDatanodeDetails(
+          hostname + i, rack + (i / NODE_PER_RACK));
+      DatanodeInfo datanodeInfo = new DatanodeInfo(
+          datanodeDetails, NodeStatus.inServiceHealthy(),
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      StorageReportProto storage1 = TestUtils.createStorageReport(
+          datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+          STORAGE_CAPACITY, 0, 100L, null);
+      MetadataStorageReportProto metaStorage1 =
+          TestUtils.createMetadataStorageReport(
+          "/metadata1-" + datanodeInfo.getUuidString(),
+          STORAGE_CAPACITY, 0, 100L, null);
+      datanodeInfo.updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage1)));
+      datanodeInfo.updateMetaDataStorageReports(
+          new ArrayList<>(Arrays.asList(metaStorage1)));
+
+      datanodes.add(datanodeDetails);
+      cluster.add(datanodeDetails);
+      dnInfos.add(datanodeInfo);
+    }
+
+    if (datanodeCount > 4) {
+      StorageReportProto storage2 = TestUtils.createStorageReport(
+          dnInfos.get(2).getUuid(),
+          "/data1-" + datanodes.get(2).getUuidString(),
+          STORAGE_CAPACITY, 90L, 10L, null);
+      dnInfos.get(2).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage2)));
+      StorageReportProto storage3 = TestUtils.createStorageReport(
+          dnInfos.get(3).getUuid(),
+          "/data1-" + dnInfos.get(3).getUuidString(),
+          STORAGE_CAPACITY, 80L, 20L, null);
+      dnInfos.get(3).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage3)));
+      StorageReportProto storage4 = TestUtils.createStorageReport(
+          dnInfos.get(4).getUuid(),
+          "/data1-" + dnInfos.get(4).getUuidString(),
+          STORAGE_CAPACITY, 70L, 30L, null);
+      dnInfos.get(4).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage4)));
+    } else if (datanodeCount > 3) {
+      StorageReportProto storage2 = TestUtils.createStorageReport(
+          dnInfos.get(2).getUuid(),
+          "/data1-" + dnInfos.get(2).getUuidString(),
+          STORAGE_CAPACITY, 90L, 10L, null);
+      dnInfos.get(2).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage2)));
+      StorageReportProto storage3 = TestUtils.createStorageReport(
+          dnInfos.get(3).getUuid(),
+          "/data1-" + dnInfos.get(3).getUuidString(),
+          STORAGE_CAPACITY, 80L, 20L, null);
+      dnInfos.get(3).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage3)));
+    } else if (datanodeCount > 2) {
+      StorageReportProto storage2 = TestUtils.createStorageReport(
+          dnInfos.get(2).getUuid(),
+          "/data1-" + dnInfos.get(2).getUuidString(),
+          STORAGE_CAPACITY, 84L, 16L, null);
+      dnInfos.get(2).updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage2)));
+    }
+
+    // create mock node manager
+    nodeManager = Mockito.mock(NodeManager.class);
+    when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
+        .thenReturn(new ArrayList<>(datanodes));
+    for (DatanodeInfo dn: dnInfos) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
+    when(nodeManager.getClusterNetworkTopologyMap())
+        .thenReturn(cluster);
+
+    // create placement policy instances
+    metrics = SCMContainerPlacementMetrics.create();
+    policy = new SCMContainerPlacementRackScatter(
+        nodeManager, conf, cluster, true, metrics);
+  }
+
+  @After
+  public void teardown() {
+    metrics.unRegister();
+  }
+
+  @Test
+  public void chooseNodeWithNoExcludedNodes() throws SCMException {
+    int rackLevel = cluster.getMaxLevel() - 1;
+    int rackNum = cluster.getNumOfNodes(rackLevel);
+
+    // test choose new datanodes for new pipeline cases
+    // 1 replica
+    int nodeNum = 1;
+    List<DatanodeDetails> datanodeDetails =
+        policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+
+    // 2 replicas
+    nodeNum = 2;
+    datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertTrue(!cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)) || (datanodeCount <= NODE_PER_RACK));
+
+    //  3 replicas
+    nodeNum = 3;
+    if (datanodeCount > nodeNum) {
+      datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+      Assert.assertEquals(nodeNum, datanodeDetails.size());
+      Assert.assertEquals(getRackSize(datanodeDetails),
+          Math.min(nodeNum, rackNum));
+    }
+
+    //  5 replicas
+    nodeNum = 5;
+    if (datanodeCount > nodeNum) {
+      assumeTrue(datanodeCount >= NODE_PER_RACK);
+      datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+      Assert.assertEquals(nodeNum, datanodeDetails.size());
+      Assert.assertEquals(getRackSize(datanodeDetails),
+          Math.min(nodeNum, rackNum));
+    }
+
+    //  10 replicas
+    nodeNum = 10;
+    if (datanodeCount > nodeNum) {
+      assumeTrue(datanodeCount > 2 * NODE_PER_RACK);
+      datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+      Assert.assertEquals(nodeNum, datanodeDetails.size());
+      Assert.assertEquals(getRackSize(datanodeDetails),
+          Math.min(nodeNum, rackNum));
+    }
+  }
+
+  @Test
+  public void chooseNodeWithExcludedNodes() throws SCMException {
+    int rackLevel = cluster.getMaxLevel() - 1;
+    int rackNum = cluster.getNumOfNodes(rackLevel);
+    int totalNum;
+    // test choose new datanodes for under replicated pipeline
+    // 3 replicas, two existing datanodes on same rack
+    assumeTrue(datanodeCount > NODE_PER_RACK);
+    int nodeNum = 1;
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+
+    excludedNodes.add(datanodes.get(0));
+    excludedNodes.add(datanodes.get(1));
+    List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        excludedNodes.get(0)));
+    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        excludedNodes.get(1)));
+
+    // 3 replicas, one existing datanode
+    nodeNum = 2;
+    totalNum = 3;
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+        Math.min(totalNum, rackNum));
+
+    // 3 replicas, two existing datanodes on different rack
+    nodeNum = 1;
+    totalNum = 3;
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    excludedNodes.add(datanodes.get(5));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+        Math.min(totalNum, rackNum));
+
+    // 5 replicas, one existing datanode
+    nodeNum = 4;
+    totalNum = 5;
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+        Math.min(totalNum, rackNum));
+
+    // 5 replicas, two existing datanodes on different rack
+    nodeNum = 3;
+    totalNum = 5;
+    excludedNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    excludedNodes.add(datanodes.get(5));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+        Math.min(totalNum, rackNum));
+  }
+
+  @Test
+  public void chooseNodeWithFavoredNodes() throws SCMException {
+    int nodeNum = 1;
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+    List<DatanodeDetails> favoredNodes = new ArrayList<>();
+
+    // no excludedNodes, only favoredNodes
+    favoredNodes.add(datanodes.get(0));
+    List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(datanodeDetails.get(0).getNetworkFullPath(),
+        favoredNodes.get(0).getNetworkFullPath());
+
+    // no overlap between excludedNodes and favoredNodes, favoredNodes can been
+    // chosen.
+    excludedNodes.clear();
+    favoredNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    favoredNodes.add(datanodes.get(1));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(datanodeDetails.get(0).getNetworkFullPath(),
+        favoredNodes.get(0).getNetworkFullPath());
+
+    // there is overlap between excludedNodes and favoredNodes, favoredNodes
+    // should not be chosen.
+    excludedNodes.clear();
+    favoredNodes.clear();
+    excludedNodes.add(datanodes.get(0));
+    favoredNodes.add(datanodes.get(0));
+    datanodeDetails = policy.chooseDatanodes(
+        excludedNodes, favoredNodes, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertFalse(datanodeDetails.get(0).getNetworkFullPath()
+        .equals(favoredNodes.get(0).getNetworkFullPath()));
+  }
+
+  @Test
+  public void testNoInfiniteLoop() throws SCMException {
+    int nodeNum = 1;
+
+    try {
+      // request storage space larger than node capability
+      policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 0, 15);
+      fail("Storage requested exceeds capacity, this call should fail");
+    } catch (Exception e) {
+      assertTrue(e.getClass().getSimpleName().equals("SCMException"));
+    }
+
+    // get metrics
+    long totalRequest = metrics.getDatanodeRequestCount();
+    long successCount = metrics.getDatanodeChooseSuccessCount();
+    long tryCount = metrics.getDatanodeChooseAttemptCount();
+    long compromiseCount = metrics.getDatanodeChooseFallbackCount();
+
+    Assert.assertEquals(totalRequest, nodeNum);
+    Assert.assertEquals(successCount, 0);
+    MatcherAssert.assertThat("Not enough try", tryCount,
+        greaterThanOrEqualTo((long) nodeNum));
+    Assert.assertEquals(compromiseCount, 0);
+  }
+
+  @Test
+  public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
+    String hostname = "node";
+    List<DatanodeInfo> dnInfoList = new ArrayList<>();
+    List<DatanodeDetails> dataList = new ArrayList<>();
+    NetworkTopology clusterMap =
+        new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+    for (int i = 0; i < 30; i++) {
+      // Totally 6 racks, each has 5 datanodes
+      DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails(
+          hostname + i, null);
+      DatanodeInfo dnInfo = new DatanodeInfo(
+          dn, NodeStatus.inServiceHealthy(),
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      StorageReportProto storage1 = TestUtils.createStorageReport(
+          dnInfo.getUuid(), "/data1-" + dnInfo.getUuidString(),
+          STORAGE_CAPACITY, 0, 100L, null);
+      MetadataStorageReportProto metaStorage1 =
+          TestUtils.createMetadataStorageReport(
+          "/metadata1-" + dnInfo.getUuidString(),
+          STORAGE_CAPACITY, 0, 100L, null);
+      dnInfo.updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage1)));
+      dnInfo.updateMetaDataStorageReports(
+          new ArrayList<>(Arrays.asList(metaStorage1)));
+
+      dataList.add(dn);
+      clusterMap.add(dn);
+      dnInfoList.add(dnInfo);
+    }
+    Assert.assertEquals(dataList.size(), StringUtils.countMatches(
+        clusterMap.toString(), NetConstants.DEFAULT_RACK));
+    for (DatanodeInfo dn: dnInfoList) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
+
+    // choose nodes to host 5 replica
+    int nodeNum = 5;
+    SCMContainerPlacementRackScatter newPolicy =
+        new SCMContainerPlacementRackScatter(nodeManager, conf, clusterMap,
+            true, metrics);
+    List<DatanodeDetails> datanodeDetails =
+        newPolicy.chooseDatanodes(null, null, nodeNum, 0, 15);
+    Assert.assertEquals(nodeNum, datanodeDetails.size());
+    Assert.assertEquals(1, getRackSize(datanodeDetails));
+  }
+
+  @Test
+  public void testvalidateContainerPlacement() {
+    // Only run this test for the full set of DNs. 5 DNs per rack on 6 racks.
+    assumeTrue(datanodeCount >= 15);
+    List<DatanodeDetails> dns = new ArrayList<>();
+    // First 5 node are on the same rack
+    dns.add(datanodes.get(0));
+    dns.add(datanodes.get(1));
+    dns.add(datanodes.get(2));
+    ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3);
+    assertFalse(stat.isPolicySatisfied());
+    assertEquals(2, stat.misReplicationCount());
+
+    // Pick a new list which spans 2 racks
+    dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    dns.add(datanodes.get(1));
+    dns.add(datanodes.get(5)); // This is on second rack
+    stat = policy.validateContainerPlacement(dns, 3);
+    assertFalse(stat.isPolicySatisfied());
+    assertEquals(1, stat.misReplicationCount());
+
+    // Pick single DN, expecting 3 replica. Policy is not met.
+    dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    stat = policy.validateContainerPlacement(dns, 3);
+    assertFalse(stat.isPolicySatisfied());
+    assertEquals(2, stat.misReplicationCount());
+
+    // Pick single DN, expecting 1 replica. Policy is met.
+    dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    stat = policy.validateContainerPlacement(dns, 1);
+    assertTrue(stat.isPolicySatisfied());
+    assertEquals(0, stat.misReplicationCount());
+  }
+
+  @Test
+  public void testvalidateContainerPlacementSingleRackCluster() {
+    assumeTrue(datanodeCount == 5);
+
+    // All nodes are on the same rack in this test, and the cluster only has
+    // one rack.
+    List<DatanodeDetails> dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    dns.add(datanodes.get(1));
+    dns.add(datanodes.get(2));
+    ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3);
+    assertTrue(stat.isPolicySatisfied());
+    assertEquals(0, stat.misReplicationCount());
+
+    // Single DN - policy met as cluster only has one rack.
+    dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    stat = policy.validateContainerPlacement(dns, 3);
+    assertTrue(stat.isPolicySatisfied());
+    assertEquals(0, stat.misReplicationCount());
+
+    // Single DN - only 1 replica expected
+    dns = new ArrayList<>();
+    dns.add(datanodes.get(0));
+    stat = policy.validateContainerPlacement(dns, 1);
+    assertTrue(stat.isPolicySatisfied());
+    assertEquals(0, stat.misReplicationCount());
+  }
+
+  private int getRackSize(List<DatanodeDetails>... datanodeDetails) {
+    Set<Node> racks = new HashSet<>();
+    for (List<DatanodeDetails> list : datanodeDetails) {
+      for (DatanodeDetails dn : list) {
+        racks.add(cluster.getAncestor(dn, 1));
+      }
+    }
+    return racks.size();
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to