This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eed1ec3d3 HDDS-7265. ScatterRackPolicy for Ratis pipeline provider 
container placement (#4002)
6eed1ec3d3 is described below

commit 6eed1ec3d3e68e30861964a2dff042bd9b37be0b
Author: Neil Joshi <[email protected]>
AuthorDate: Fri Dec 9 15:06:50 2022 -0700

    HDDS-7265. ScatterRackPolicy for Ratis pipeline provider container 
placement (#4002)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   2 +
 .../SCMContainerPlacementRackScatter.java          |  26 ++-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  20 +-
 .../pipeline/PipelinePlacementPolicyFactory.java   |  52 +++++
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  34 +++-
 .../TestSCMContainerPlacementRackScatter.java      |  40 ++++
 .../scm/pipeline/TestPipelinePlacementFactory.java | 215 +++++++++++++++++++++
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  15 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  22 ++-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   3 +-
 10 files changed, 410 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1c4e09ead9..c324c3e841 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -344,6 +344,8 @@ public final class ScmConfigKeys {
 
   public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
       "ozone.scm.container.placement.impl";
+  public static final String OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY =
+      "ozone.scm.pipeline.placement.impl";
   public static final String OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY =
       "ozone.scm.container.placement.ec.impl";
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index 2f86df8708..6b4f9d9116 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +82,19 @@ public final class SCMContainerPlacementRackScatter
     this.metrics = metrics;
   }
 
+  /**
+   * Constructor for Pipeline Provider Pipeline Placement with rack awareness.
+   * @param nodeManager Node Manager
+   * @param stateManager State Manager
+   * @param conf Configuration
+   */
+  public SCMContainerPlacementRackScatter(NodeManager nodeManager,
+      PipelineStateManager stateManager, ConfigurationSource conf) {
+    super(nodeManager, conf);
+    this.networkTopology = nodeManager.getClusterNetworkTopologyMap();
+    this.metrics = null;
+  }
+
   public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
        List<Node> unavailableNodes,
        List<DatanodeDetails> mutableFavoredNodes,
@@ -205,7 +219,9 @@ public final class SCMContainerPlacementRackScatter
           "than 0, but the given num is " + nodesRequiredToChoose;
       throw new SCMException(errorMsg, null);
     }
-    metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+    if (metrics != null) {
+      metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+    }
     int nodesRequired = nodesRequiredToChoose;
     int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
     List<Node> availableNodes = networkTopology.getNodes(
@@ -363,7 +379,9 @@ public final class SCMContainerPlacementRackScatter
       long metadataSizeRequired, long dataSizeRequired) {
     int maxRetry = INNER_LOOP_MAX_RETRY;
     while (true) {
-      metrics.incrDatanodeChooseAttemptCount();
+      if (metrics != null) {
+        metrics.incrDatanodeChooseAttemptCount();
+      }
       Node node = null;
       try {
         node = networkTopology.chooseRandom(scope, excludedNodes);
@@ -379,7 +397,9 @@ public final class SCMContainerPlacementRackScatter
         DatanodeDetails datanodeDetails = (DatanodeDetails) node;
         if (isValidNode(datanodeDetails, metadataSizeRequired,
             dataSizeRequired)) {
-          metrics.incrDatanodeChooseSuccessCount();
+          if (metrics != null) {
+            metrics.incrDatanodeChooseSuccessCount();
+          }
           return node;
         }
         // exclude the unavailable node for the following retries.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 2287475d82..80374299b6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -83,7 +83,10 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
   }
 
-  int currentRatisThreePipelineCount(DatanodeDetails datanodeDetails) {
+  public static int currentRatisThreePipelineCount(
+      NodeManager nodeManager,
+      PipelineStateManager stateManager,
+      DatanodeDetails datanodeDetails) {
     // Safe to cast collection's size to int
     return (int) nodeManager.getPipelines(datanodeDetails).stream()
         .map(id -> {
@@ -95,12 +98,12 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
             return null;
           }
         })
-        .filter(this::isNonClosedRatisThreePipeline)
+        .filter(PipelinePlacementPolicy::isNonClosedRatisThreePipeline)
         .count();
   }
 
-  private boolean isNonClosedRatisThreePipeline(Pipeline p) {
-    return p.getReplicationConfig()
+  private static boolean isNonClosedRatisThreePipeline(Pipeline p) {
+    return p != null && p.getReplicationConfig()
         .equals(RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
         && !p.isClosed();
   }
@@ -163,7 +166,8 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     // TODO check if sorting could cause performance issue: HDDS-3466.
     List<DatanodeDetails> healthyList = healthyNodes.stream()
         .map(d ->
-            new DnWithPipelines(d, currentRatisThreePipelineCount(d)))
+            new DnWithPipelines(d, currentRatisThreePipelineCount(nodeManager,
+                stateManager, d)))
         .filter(d ->
             (d.getPipelines() < nodeManager.pipelineLimit(d.getDn())))
         .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
@@ -478,7 +482,11 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
     return REQUIRED_RACKS;
   }
 
-  private static class DnWithPipelines {
+  /**
+   * static inner utility class for datanodes with pipeline, used for
+   * pipeline engagement checking.
+   */
+  public static class DnWithPipelines {
     private DatanodeDetails dn;
     private int pipelines;
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
new file mode 100644
index 0000000000..c57448fdab
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
+
+/**
+ * Pipeline placement factor for pipeline providers to create placement 
instance
+ * based on configuration property.
+ * {@link ScmConfigKeys#OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY}
+ */
+public final class PipelinePlacementPolicyFactory {
+
+  private PipelinePlacementPolicyFactory() {
+  }
+
+  public static PlacementPolicy getPolicy(NodeManager nodeManager,
+      PipelineStateManager stateManager, ConfigurationSource conf) {
+    final Class<? extends PlacementPolicy> clazz
+        = conf.getClass(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        PipelinePlacementPolicy.class, PlacementPolicy.class);
+
+    try {
+      return clazz.getDeclaredConstructor(NodeManager.class,
+              PipelineStateManager.class, ConfigurationSource.class)
+          .newInstance(nodeManager, stateManager, conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to getPolicy for " + clazz, e);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 43b2e01c91..ad149fdec6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import 
org.apache.hadoop.hdds.scm.pipeline.PipelinePlacementPolicy.DnWithPipelines;
 import 
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy;
 import 
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -59,7 +62,7 @@ public class RatisPipelineProvider
 
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
-  private final PipelinePlacementPolicy placementPolicy;
+  private final PlacementPolicy placementPolicy;
   private int pipelineNumberLimit;
   private int maxPipelinePerDatanode;
   private final LeaderChoosePolicy leaderChoosePolicy;
@@ -77,8 +80,8 @@ public class RatisPipelineProvider
     this.conf = conf;
     this.eventPublisher = eventPublisher;
     this.scmContext = scmContext;
-    this.placementPolicy =
-        new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+    this.placementPolicy = PipelinePlacementPolicyFactory
+        .getPolicy(nodeManager, stateManager, conf);
     this.pipelineNumberLimit = conf.getInt(
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
@@ -163,6 +166,14 @@ public class RatisPipelineProvider
           containerSizeBytes);
       break;
     case THREE:
+      List<DatanodeDetails> excludeDueToEngagement = 
filterPipelineEngagement();
+      if (excludeDueToEngagement.size() > 0) {
+        if (excludedNodes.size() == 0) {
+          excludedNodes = excludeDueToEngagement;
+        } else {
+          excludedNodes.addAll(excludeDueToEngagement);
+        }
+      }
       dns = placementPolicy.chooseDatanodes(excludedNodes,
           favoredNodes, factor.getNumber(), minRatisVolumeSizeBytes,
           containerSizeBytes);
@@ -222,6 +233,23 @@ public class RatisPipelineProvider
         .collect(Collectors.toList()));
   }
 
+  private List<DatanodeDetails> filterPipelineEngagement() {
+    List<DatanodeDetails> healthyNodes =
+        getNodeManager().getNodes(NodeStatus.inServiceHealthy());
+    List<DatanodeDetails> excluded = healthyNodes.stream()
+        .map(d ->
+            new DnWithPipelines(d,
+                PipelinePlacementPolicy
+                    .currentRatisThreePipelineCount(getNodeManager(),
+                    getPipelineStateManager(), d)))
+        .filter(d ->
+            (d.getPipelines() >= getNodeManager().pipelineLimit(d.getDn())))
+        .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
+        .map(d -> d.getDn())
+        .collect(Collectors.toList());
+    return excluded;
+  }
+
   @Override
   public void shutdown() {
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
index b49037e583..815c70ffb3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
@@ -556,6 +557,45 @@ public class TestSCMContainerPlacementRackScatter {
     assertEquals(misReplication, stat.misReplicationCount());
   }
 
+  @Test
+  public void testPipelineProviderRackScatter() throws SCMException {
+    setup(3, 1);
+    conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementRackScatter.class.getCanonicalName());
+    List<DatanodeDetails> usedDns = new ArrayList<>();
+    List<DatanodeDetails> excludedDns = new ArrayList<>();
+    List<DatanodeDetails> additionalNodes = policy.chooseDatanodes(usedDns,
+        excludedDns, null, 3, 0, 5);
+    assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+        true, 0);
+  }
+
+  // Test for pipeline provider placement when number of racks less than
+  // number of node required and nodes cannot be scattered.  In this case
+  // the placement spreads the nodes as much as possible.  In one case
+  // 3 nodes required and 2 racks placing 2 in one 1 in another.  When
+  // only 1 rack placing all nodes in same rack.
+  @Test
+  public void testPipelineProviderRackScatterFallback() throws SCMException {
+    setup(3, 2);
+    conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementRackScatter.class.getCanonicalName());
+    List<DatanodeDetails> usedDns = new ArrayList<>();
+    List<DatanodeDetails> excludedDns = new ArrayList<>();
+    List<DatanodeDetails> additionalNodes = policy.chooseDatanodes(usedDns,
+        excludedDns, null, 3, 0, 5);
+    assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+        true, 0);
+
+    setup(3, 3);
+    additionalNodes = policy.chooseDatanodes(usedDns,
+        excludedDns, null, 3, 0, 5);
+    assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+        true, 0);
+  }
+
+  // add test for pipeline engagement
+
   @Test
   public void testValidChooseNodesWithUsedNodes() throws SCMException {
     setup(5, 2);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
new file mode 100644
index 0000000000..9d7bae4810
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for PipelinePlacementFactory.
+ */
+public class TestPipelinePlacementFactory {
+  private OzoneConfiguration conf;
+  private NodeManager nodeManager;
+  private NodeManager nodeManagerBase;
+  private PipelineStateManager stateManager;
+  private NetworkTopologyImpl cluster;
+  private final List<DatanodeDetails> datanodes = new ArrayList<>();
+  private final List<DatanodeInfo> dnInfos = new ArrayList<>();
+  private File testDir;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
+
+  private static final long STORAGE_CAPACITY = 100L;
+
+  @BeforeEach
+  public void setup() {
+    //initialize ozone config for tests
+    conf = new OzoneConfiguration();
+  }
+
+  private void setupRacks(int datanodeCount, int nodesPerRack)
+      throws Exception {
+    conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+        1, StorageUnit.BYTES);
+    NodeSchema[] schemas = new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+    NodeSchemaManager.getInstance().init(schemas, true);
+    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+
+    // build datanodes, and network topology
+    String rack = "/rack";
+    String hostname = "node";
+    for (int i = 0; i < datanodeCount; i++) {
+      DatanodeDetails datanodeDetails =
+          MockDatanodeDetails.createDatanodeDetails(
+              hostname + i, rack + (i / nodesPerRack));
+
+      datanodes.add(datanodeDetails);
+      cluster.add(datanodeDetails);
+      DatanodeInfo datanodeInfo = new DatanodeInfo(
+          datanodeDetails, NodeStatus.inServiceHealthy(),
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      StorageContainerDatanodeProtocolProtos.StorageReportProto storage1 =
+          HddsTestUtils.createStorageReport(
+          datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+          STORAGE_CAPACITY, 0, 100L, null);
+      StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto
+          metaStorage1 =
+          HddsTestUtils.createMetadataStorageReport(
+              "/metadata1-" + datanodeInfo.getUuidString(),
+              STORAGE_CAPACITY, 0, 100L, null);
+      datanodeInfo.updateStorageReports(
+          new ArrayList<>(Arrays.asList(storage1)));
+      datanodeInfo.updateMetaDataStorageReports(
+          new ArrayList<>(Arrays.asList(metaStorage1)));
+      dnInfos.add(datanodeInfo);
+    }
+    nodeManagerBase = new MockNodeManager(cluster, datanodes,
+        false, 10);
+    nodeManager = Mockito.spy(nodeManagerBase);
+    for (DatanodeInfo dn: dnInfos) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
+
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    scmhaManager = SCMHAManagerStub.getInstance(true);
+
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+  }
+
+  @Test
+  public void testDefaultPolicy() throws IOException {
+    PlacementPolicy policy = PipelinePlacementPolicyFactory
+        .getPolicy(null, null, conf);
+    Assertions.assertSame(PipelinePlacementPolicy.class, policy.getClass());
+  }
+
+  @Test
+  public void testRackScatterPolicy() throws Exception {
+    conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementRackScatter.class.getCanonicalName());
+    // for this test, rack setup does not matter, just
+    // need a non-null NetworkTopologyMap within the nodeManager
+    setupRacks(6, 3);
+    PlacementPolicy policy = PipelinePlacementPolicyFactory
+        .getPolicy(nodeManager, stateManager, conf);
+    Assertions.assertSame(SCMContainerPlacementRackScatter.class,
+        policy.getClass());
+  }
+
+  // test default rack aware pipeline provider placement - 3 racks
+  // pipeline created with 1 node on one rack and other 2 nodes
+  // on separate rack
+  @Test
+  public void testDefaultPipelineProviderRackPlacement() throws Exception {
+    setupRacks(6, 2);
+    PlacementPolicy policy = PipelinePlacementPolicyFactory
+        .getPolicy(nodeManager, stateManager, conf);
+
+    int nodeNum = 3;
+    List<DatanodeDetails> datanodeDetails =
+        policy.chooseDatanodes(null, null, nodeNum, 15, 15);
+    Assertions.assertEquals(nodeNum, datanodeDetails.size());
+    Assertions.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(2)));
+    Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+    Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+        datanodeDetails.get(2)));
+  }
+
+  // test rack scatter pipeline provider placement - 3 racks
+  // pipeline created with node on each rack
+  @Test
+  public void testRackScatterPipelineProviderRackPlacement() throws Exception {
+    conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementRackScatter.class.getCanonicalName());
+
+    setupRacks(6, 2);
+    PlacementPolicy policy = PipelinePlacementPolicyFactory
+        .getPolicy(nodeManager, stateManager, conf);
+
+    int nodeNum = 3;
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+    List<DatanodeDetails> favoredNodes = new ArrayList<>();
+    List<DatanodeDetails> datanodeDetails =
+        policy.chooseDatanodes(excludedNodes, excludedNodes, favoredNodes,
+            nodeNum, 15, 15);
+    Assertions.assertEquals(nodeNum, datanodeDetails.size());
+    Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(2)));
+    Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+        datanodeDetails.get(1)));
+    Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+        datanodeDetails.get(2)));
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 9d5cadeb2d..82a2ab3246 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -692,7 +692,8 @@ public class TestPipelinePlacementPolicy {
     createPipelineWithReplicationConfig(standaloneOneDn, STAND_ALONE, ONE);
 
     pipelineCount
-        = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(0));
+        = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+        stateManager, healthyNodes.get(0));
     Assertions.assertEquals(pipelineCount, 0);
 
     // Check datanode with one RATIS/ONE pipeline
@@ -701,7 +702,8 @@ public class TestPipelinePlacementPolicy {
     createPipelineWithReplicationConfig(ratisOneDn, RATIS, ONE);
 
     pipelineCount
-        = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+        = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+        stateManager, healthyNodes.get(1));
     Assertions.assertEquals(pipelineCount, 0);
 
     // Check datanode with one RATIS/THREE pipeline
@@ -712,7 +714,8 @@ public class TestPipelinePlacementPolicy {
     createPipelineWithReplicationConfig(ratisThreeDn, RATIS, THREE);
 
     pipelineCount
-        = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(2));
+        = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+        stateManager, healthyNodes.get(2));
     Assertions.assertEquals(pipelineCount, 1);
 
     // Check datanode with one RATIS/ONE and one STANDALONE/ONE pipeline
@@ -721,7 +724,8 @@ public class TestPipelinePlacementPolicy {
     createPipelineWithReplicationConfig(standaloneOneDn, STAND_ALONE, ONE);
 
     pipelineCount
-        = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+        = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+        stateManager, healthyNodes.get(1));
     Assertions.assertEquals(pipelineCount, 0);
 
     // Check datanode with one RATIS/ONE and one STANDALONE/ONE pipeline and
@@ -734,7 +738,8 @@ public class TestPipelinePlacementPolicy {
     createPipelineWithReplicationConfig(ratisThreeDn, RATIS, THREE);
 
     pipelineCount
-        = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+        = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+        stateManager, healthyNodes.get(1));
     Assertions.assertEquals(pipelineCount, 2);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index b788fd713e..1d3d1ae473 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -58,8 +59,10 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.apache.commons.collections.CollectionUtils.intersection;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -316,6 +319,23 @@ public class TestRatisPipelineProvider {
     }
   }
 
+  @Test
+  // Test pipeline provider with RackScatter policy cannot create
+  // pipeline due to nodes with full pipeline engagement.
+  public void testFactorTHREEPipelineRackScatterEngagement()
+      throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementRackScatter.class.getCanonicalName());
+    conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "0");
+    init(0, conf);
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+
+    Assertions.assertThrows(SCMException.class, () ->
+        provider.create(RatisReplicationConfig
+                .getInstance(ReplicationFactor.THREE),
+            excludedNodes, Collections.EMPTY_LIST));
+  }
 
   @Test
   public void testCreatePipelinesWhenNotEnoughSpace() throws Exception {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 137ae08dad..502493eb40 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -132,7 +132,8 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
         OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
         OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
-        OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD
+        OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY
     ));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to