This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e6f90aae HDDS-9345. Add CapacityPipelineChoosePolicy considering 
datanode storage space (#5354)
73e6f90aae is described below

commit 73e6f90aae4dc22a265361b9a63ccb25a5e6919a
Author: Hongbing Wang <[email protected]>
AuthorDate: Tue Jan 23 01:11:32 2024 +0800

    HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage 
space (#5354)
---
 .../java/org/apache/hadoop/hdds/scm/ScmConfig.java |  32 +++--
 .../hadoop/hdds/scm/PipelineChoosePolicy.java      |  10 ++
 .../container/placement/metrics/SCMNodeMetric.java |  16 ++-
 .../container/placement/metrics/SCMNodeStat.java   |   9 ++
 .../algorithms/CapacityPipelineChoosePolicy.java   | 136 +++++++++++++++++++++
 .../algorithms/PipelineChoosePolicyFactory.java    |  10 +-
 .../hdds/scm/server/StorageContainerManager.java   |   4 +-
 .../pipeline/TestWritableECContainerProvider.java  |  24 +++-
 .../TestCapacityPipelineChoosePolicy.java          | 107 ++++++++++++++++
 .../TestPipelineChoosePolicyFactory.java           |  19 +--
 10 files changed, 340 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
index 46816a63d3..2fc04e00f2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
@@ -73,8 +73,17 @@ public class ScmConfig extends ReconfigurableConfig {
           + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
           + "The class decides which pipeline will be used to find or "
           + "allocate Ratis containers. If not set, "
-          + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
-          + "RandomPipelineChoosePolicy will be used as default value."
+          + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "RandomPipelineChoosePolicy will be used as default value. "
+          + "The following values can be used, "
+          + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "RandomPipelineChoosePolicy : random choose one pipeline. "
+          + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. 
"
+          + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+          + "utilization from the two pipelines. Note that random choose "
+          + "method will be executed twice in this policy."
   )
   private String pipelineChoosePolicyName;
 
@@ -85,11 +94,20 @@ public class ScmConfig extends ReconfigurableConfig {
       tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
       description =
           "The full name of class which implements "
-              + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
-              + "The class decides which pipeline will be used when "
-              + "selecting an EC Pipeline. If not set, "
-              + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
-              + "RandomPipelineChoosePolicy will be used as default value."
+          + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+          + "The class decides which pipeline will be used when "
+          + "selecting an EC Pipeline. If not set, "
+          + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "RandomPipelineChoosePolicy will be used as default value. "
+          + "The following values can be used, "
+          + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "RandomPipelineChoosePolicy : random choose one pipeline. "
+          + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. 
"
+          + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+          + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+          + "utilization from the two pipelines. Note that random choose "
+          + "method will be executed twice in this policy."
   )
   private String ecPipelineChoosePolicyName;
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
similarity index 86%
rename from 
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
rename to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
index 76439a7846..e1d0fdd35a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm;
 
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
 import java.util.List;
@@ -26,6 +27,15 @@ import java.util.List;
  */
 public interface PipelineChoosePolicy {
 
+  /**
+   * Updates the policy with NodeManager.
+   * @return updated policy.
+   */
+  default PipelineChoosePolicy init(final NodeManager nodeManager) {
+    // override if the policy requires nodeManager
+    return this;
+  }
+
   /**
    * Given an initial list of pipelines, return one of the pipelines.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
index 330bf67416..094e535dcb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
 /**
  * SCM Node Metric that is used in the placement classes.
  */
-public class SCMNodeMetric  implements DatanodeMetric<SCMNodeStat, Long> {
+public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long>,
+    Comparable<SCMNodeMetric> {
   private SCMNodeStat stat;
 
   /**
@@ -195,12 +196,12 @@ public class SCMNodeMetric  implements 
DatanodeMetric<SCMNodeStat, Long> {
    * @throws ClassCastException   if the specified object's type prevents it
    *                              from being compared to this object.
    */
-  //@Override
-  public int compareTo(SCMNodeStat o) {
-    if (isEqual(o)) {
+  @Override
+  public int compareTo(SCMNodeMetric o) {
+    if (isEqual(o.get())) {
       return 0;
     }
-    if (isGreater(o)) {
+    if (isGreater(o.get())) {
       return 1;
     } else {
       return -1;
@@ -225,4 +226,9 @@ public class SCMNodeMetric  implements 
DatanodeMetric<SCMNodeStat, Long> {
   public int hashCode() {
     return stat != null ? stat.hashCode() : 0;
   }
+
+  @Override
+  public String toString() {
+    return "SCMNodeMetric{" + stat.toString() + '}';
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
index 2a848a04ef..5456e6ee52 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
@@ -174,4 +174,13 @@ public class SCMNodeStat implements NodeStat {
     return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get() ^
         committed.get() ^ freeSpaceToSpare.get());
   }
+
+  @Override
+  public String toString() {
+    return "SCMNodeStat{" +
+        "capacity=" + capacity.get() +
+        ", scmUsed=" + scmUsed.get() +
+        ", remaining=" + remaining.get() +
+        '}';
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
new file mode 100644
index 0000000000..a95a473de6
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pipeline choose policy that randomly choose pipeline with relatively
+ * lower utilization.
+ * <p>
+ * The Algorithm is as follows, Pick 2 random pipelines from a given pool of
+ * pipelines and then pick the pipeline which has lower utilization.
+ * This leads to a higher probability of pipelines with lower utilization
+ * to be picked.
+ * <p>
+ * For those wondering why we choose two pipelines randomly and choose the
+ * pipeline with lower utilization. There are links to this original papers in
+ * HDFS-11564.
+ * Also, the same algorithm applies to SCMContainerPlacementCapacity.
+ * <p>
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy() {
+    healthPolicy = new HealthyPipelineChoosePolicy();
+  }
+
+  @Override
+  public PipelineChoosePolicy init(final NodeManager scmNodeManager) {
+    this.nodeManager = scmNodeManager;
+    return this;
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      // Compare the scmUsed weight of the node in the two sorted node stacks
+      LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}",
+          sortedNodes1, sortedNodes2);
+      int result = 0;
+      int count = 0;
+      while (result == 0 &&
+          !sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) {
+        count++;
+        LOG.debug("Compare {} round", count);
+        result = sortedNodes1.pop().compareTo(sortedNodes2.pop());
+      }
+      return result;
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
index d040dbe2bc..90736a0181 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,14 +49,14 @@ public final class PipelineChoosePolicyFactory {
   private PipelineChoosePolicyFactory() {
   }
 
-  public static PipelineChoosePolicy getPolicy(
+  public static PipelineChoosePolicy getPolicy(final NodeManager nodeManager,
       ScmConfig scmConfig, boolean forEC) throws SCMException {
     Class<? extends PipelineChoosePolicy> policyClass = null;
     String policyName = forEC ? scmConfig.getECPipelineChoosePolicyName() :
         scmConfig.getPipelineChoosePolicyName();
     try {
       policyClass = getClass(policyName, PipelineChoosePolicy.class);
-      return createPipelineChoosePolicyFromClass(policyClass);
+      return createPipelineChoosePolicyFromClass(nodeManager, policyClass);
     } catch (Exception e) {
       Class<? extends PipelineChoosePolicy> defaultPolicy = forEC ?
           OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT :
@@ -64,13 +65,14 @@ public final class PipelineChoosePolicyFactory {
         LOG.error("Met an exception while create pipeline choose policy "
             + "for the given class {}. Fallback to the default pipeline "
             + " choose policy {}", policyName, defaultPolicy, e);
-        return createPipelineChoosePolicyFromClass(defaultPolicy);
+        return createPipelineChoosePolicyFromClass(nodeManager, defaultPolicy);
       }
       throw e;
     }
   }
 
   private static PipelineChoosePolicy createPipelineChoosePolicyFromClass(
+      final NodeManager nodeManager,
       Class<? extends PipelineChoosePolicy> policyClass) throws SCMException {
     Constructor<? extends PipelineChoosePolicy> constructor;
     try {
@@ -86,7 +88,7 @@ public final class PipelineChoosePolicyFactory {
     }
 
     try {
-      return constructor.newInstance();
+      return constructor.newInstance().init(nodeManager);
     } catch (Exception e) {
       throw new RuntimeException("Failed to instantiate class " +
           policyClass.getCanonicalName() + " for " + e.getMessage());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1a3ea2515f..046be68760 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -804,9 +804,9 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
 
     ScmConfig scmConfig = conf.getObject(ScmConfig.class);
     pipelineChoosePolicy = PipelineChoosePolicyFactory
-        .getPolicy(scmConfig, false);
+        .getPolicy(scmNodeManager, scmConfig, false);
     ecPipelineChoosePolicy = PipelineChoosePolicyFactory
-        .getPolicy(scmConfig, true);
+        .getPolicy(scmNodeManager, scmConfig, true);
     if (configurator.getWritableContainerFactory() != null) {
       writableContainerFactory = configurator.getWritableContainerFactory();
     } else {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 54d2ffed82..4f86450d03 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -34,7 +34,11 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import 
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
+import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy;
 import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
 import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -54,8 +58,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -84,7 +93,7 @@ public class TestWritableECContainerProvider {
   private OzoneConfiguration conf;
   private DBStore dbStore;
   private SCMHAManager scmhaManager;
-  private MockNodeManager nodeManager;
+  private static MockNodeManager nodeManager;
   private WritableContainerProvider<ECReplicationConfig> provider;
   private ECReplicationConfig repConfig;
 
@@ -93,8 +102,20 @@ public class TestWritableECContainerProvider {
 
   public static Collection<PipelineChoosePolicy> policies() {
     Collection<PipelineChoosePolicy> policies = new ArrayList<>();
+    // init nodeManager
+    NodeSchemaManager.getInstance().init(new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}, true);
+    NetworkTopologyImpl cluster =
+        new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+    int count = 10;
+    List<DatanodeDetails> datanodes = IntStream.range(0, count)
+        .mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails())
+        .collect(Collectors.toList());
+    nodeManager = new MockNodeManager(cluster, datanodes, false, count);
+
     policies.add(new RandomPipelineChoosePolicy());
     policies.add(new HealthyPipelineChoosePolicy());
+    policies.add(new CapacityPipelineChoosePolicy().init(nodeManager));
     return policies;
   }
 
@@ -110,7 +131,6 @@ public class TestWritableECContainerProvider {
     dbStore = DBStoreBuilder.createDBStore(
         conf, new SCMDBDefinition());
     scmhaManager = SCMHAManagerStub.getInstance(true);
-    nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
         new MockPipelineManager(dbStore, scmhaManager, nodeManager);
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
new file mode 100644
index 0000000000..421d2396bf
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+
+  @Test
+  public void testChoosePipeline() throws Exception {
+
+    // given 4 datanode
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+    //          dn0   dn1   dn2   dn3
+    // used       0   10    20    30
+    NodeManager mockNodeManager = mock(NodeManager.class);
+    when(mockNodeManager.getNodeStat(datanodes.get(0)))
+        .thenReturn(new SCMNodeMetric(100L, 0, 100L, 0, 0));
+    when(mockNodeManager.getNodeStat(datanodes.get(1)))
+        .thenReturn(new SCMNodeMetric(100L, 10L, 90L, 0, 0));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100L, 20L, 80L, 0, 0));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100L, 30L, 70L, 0, 0));
+
+    PipelineChoosePolicy policy = new 
CapacityPipelineChoosePolicy().init(mockNodeManager);
+
+    // generate 4 pipelines, and every pipeline has 3 datanodes
+    //
+    //  pipeline0    dn1   dn2   dn3
+    //  pipeline1    dn0   dn2   dn3
+    //  pipeline2    dn0   dn1   dn3
+    //  pipeline3    dn0   dn1   dn2
+    //
+    // In the above scenario, pipeline0 vs pipeline1 runs through three rounds
+    // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0),
+    // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected.
+    //
+    List<Pipeline> pipelines = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      List<DatanodeDetails> dns = new ArrayList<>();
+      for (int j = 0; j < datanodes.size(); j++) {
+        if (i != j) {
+          dns.add(datanodes.get(j));
+        }
+      }
+      Pipeline pipeline = MockPipeline.createPipeline(dns);
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+      pipelines.add(pipeline);
+    }
+
+    Map<Pipeline, Integer> selectedCount = new HashMap<>();
+    for (Pipeline pipeline : pipelines) {
+      selectedCount.put(pipeline, 0);
+    }
+    for (int i = 0; i < 1000; i++) {
+      // choosePipeline
+      Pipeline pipeline = policy.choosePipeline(pipelines, null);
+      assertNotNull(pipeline);
+      selectedCount.put(pipeline, selectedCount.get(pipeline) + 1);
+    }
+
+    // The selected count from most to least should be :
+    // pipeline3 > pipeline2 > pipeline1 > pipeline0
+    
assertThat(selectedCount.get(pipelines.get(3))).isGreaterThan(selectedCount.get(pipelines.get(2)));
+    
assertThat(selectedCount.get(pipelines.get(2))).isGreaterThan(selectedCount.get(pipelines.get(1)));
+    
assertThat(selectedCount.get(pipelines.get(1))).isGreaterThan(selectedCount.get(pipelines.get(0)));
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
index 7d0a72ed2f..82fed5953a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
@@ -21,7 +21,9 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -42,17 +44,20 @@ public class TestPipelineChoosePolicyFactory {
 
   private ScmConfig scmConfig;
 
+  private NodeManager nodeManager;
+
   @BeforeEach
   public void setup() {
     //initialize network topology instance
     conf = new OzoneConfiguration();
     scmConfig = conf.getObject(ScmConfig.class);
+    nodeManager = new MockNodeManager(true, 5);
   }
 
   @Test
   public void testDefaultPolicy() throws IOException {
     PipelineChoosePolicy policy = PipelineChoosePolicyFactory
-        .getPolicy(scmConfig, false);
+        .getPolicy(nodeManager, scmConfig, false);
     assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
   }
@@ -60,7 +65,7 @@ public class TestPipelineChoosePolicyFactory {
   @Test
   public void testDefaultPolicyEC() throws IOException {
     PipelineChoosePolicy policy = PipelineChoosePolicyFactory
-        .getPolicy(scmConfig, true);
+        .getPolicy(nodeManager, scmConfig, true);
     assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
   }
@@ -69,7 +74,7 @@ public class TestPipelineChoosePolicyFactory {
   public void testNonDefaultPolicyEC() throws IOException {
     scmConfig.setECPipelineChoosePolicyName(DummyGoodImpl.class.getName());
     PipelineChoosePolicy policy = PipelineChoosePolicyFactory
-        .getPolicy(scmConfig, true);
+        .getPolicy(nodeManager, scmConfig, true);
     assertSame(DummyGoodImpl.class, policy.getClass());
   }
 
@@ -121,10 +126,10 @@ public class TestPipelineChoosePolicyFactory {
     scmConfig.setPipelineChoosePolicyName(DummyImpl.class.getName());
     scmConfig.setECPipelineChoosePolicyName(DummyImpl.class.getName());
     PipelineChoosePolicy policy =
-        PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
+        PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false);
     assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
-    policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
+    policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, 
true);
     assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
   }
@@ -137,10 +142,10 @@ public class TestPipelineChoosePolicyFactory {
     scmConfig.setECPipelineChoosePolicyName(
         "org.apache.hadoop.hdds.scm.pipeline.choose.policy.HelloWorld");
     PipelineChoosePolicy policy =
-        PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
+        PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false);
     assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
-    policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
+    policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, 
true);
     assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
         policy.getClass());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to