This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 39ed524a58 HDDS-11020. Implement RoundRobinPipelineChoosePolicy (#6832)
39ed524a58 is described below

commit 39ed524a581cbcfe00b34b794b4297b707a4e3b1
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Jun 25 21:46:29 2024 -0700

    HDDS-11020. Implement RoundRobinPipelineChoosePolicy (#6832)
---
 .../java/org/apache/hadoop/hdds/scm/ScmConfig.java |  60 ++++----
 .../algorithms/RoundRobinPipelineChoosePolicy.java |  63 ++++++++
 .../TestRoundRobinPipelineChoosePolicy.java        | 170 +++++++++++++++++++++
 .../rpc/TestOzoneClientRetriesOnExceptions.java    |   4 +-
 4 files changed, 262 insertions(+), 35 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
index 2fc04e00f2..3ef9317ced 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
@@ -63,51 +63,43 @@ public class ScmConfig extends ReconfigurableConfig {
   )
   private String action;
 
+  private static final String 
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL =
+      "One of the following values can be used: "
+      + "(1) 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+      + " : chooses a pipeline randomly. "
+      + "(2) 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy"
+      + " : chooses a healthy pipeline randomly. "
+      + "(3) 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy"
+      + " : chooses the pipeline with lower utilization from two random 
pipelines. Note that"
+      + " random choose method will be executed twice in this policy."
+      + "(4) 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RoundRobinPipelineChoosePolicy"
+      + " : chooses a pipeline in a round robin fashion. Intended for 
troubleshooting and testing purposes only.";
+
+  // hdds.scm.pipeline.choose.policy.impl
   @Config(key = "pipeline.choose.policy.impl",
       type = ConfigType.STRING,
-      defaultValue = "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms" +
-          ".RandomPipelineChoosePolicy",
+      defaultValue = 
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy",
       tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
       description =
-          "The full name of class which implements "
-          + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
-          + "The class decides which pipeline will be used to find or "
-          + "allocate Ratis containers. If not set, "
-          + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "RandomPipelineChoosePolicy will be used as default value. "
-          + "The following values can be used, "
-          + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "RandomPipelineChoosePolicy : random choose one pipeline. "
-          + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. 
"
-          + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
-          + "utilization from the two pipelines. Note that random choose "
-          + "method will be executed twice in this policy."
+          "Sets the policy for choosing a pipeline for a Ratis container. The 
value should be "
+          + "the full name of a class which implements 
org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+          + "The class decides which pipeline will be used to find or allocate 
Ratis containers. If not set, "
+          + 
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+          + " will be used as default value. " + 
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL
   )
   private String pipelineChoosePolicyName;
 
+  // hdds.scm.ec.pipeline.choose.policy.impl
   @Config(key = "ec.pipeline.choose.policy.impl",
       type = ConfigType.STRING,
-      defaultValue = "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms" +
-          ".RandomPipelineChoosePolicy",
+      defaultValue = 
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy",
       tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
       description =
-          "The full name of class which implements "
-          + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
-          + "The class decides which pipeline will be used when "
-          + "selecting an EC Pipeline. If not set, "
-          + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "RandomPipelineChoosePolicy will be used as default value. "
-          + "The following values can be used, "
-          + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "RandomPipelineChoosePolicy : random choose one pipeline. "
-          + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. 
"
-          + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
-          + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
-          + "utilization from the two pipelines. Note that random choose "
-          + "method will be executed twice in this policy."
+          "Sets the policy for choosing an EC pipeline. The value should be "
+          + "the full name of a class which implements 
org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+          + "The class decides which pipeline will be used when selecting an 
EC Pipeline. If not set, "
+          + 
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+          + " will be used as default value. " + 
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL
   )
   private String ecPipelineChoosePolicyName;
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
new file mode 100644
index 0000000000..0fb1a1243d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Round-robin choose policy that chooses pipeline in a round-robin fashion.
+ * Only useful for debugging and testing purposes, at least for now.
+ */
+public class RoundRobinPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(RoundRobinPipelineChoosePolicy.class);
+
+  // Stores the index of the next pipeline to be returned.
+  private int nextPipelineIndex = 0;
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    return pipelineList.get(choosePipelineIndex(pipelineList, pri));
+  }
+
+  /**
+   * Given a list of pipelines, return the index of the chosen pipeline.
+   * @param pipelineList List of pipelines
+   * @param pri          PipelineRequestInformation
+   * @return Index in the list of the chosen pipeline.
+   */
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    final int numPipelines = pipelineList.size();
+    int chosenIndex;
+    synchronized (this) {
+      nextPipelineIndex = nextPipelineIndex % numPipelines;
+      chosenIndex = nextPipelineIndex++;
+    }
+    LOG.debug("chosenIndex = {}, numPipelines = {}", chosenIndex, 
numPipelines);
+    return chosenIndex;
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
new file mode 100644
index 0000000000..2dc7958631
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for the round-robin pipeline choose policy.
+ */
+public class TestRoundRobinPipelineChoosePolicy {
+  private static final int NUM_DATANODES = 4;
+  private static final int NUM_PIPELINES = 4;
+  private PipelineChoosePolicy policy;
+  private List<Pipeline> allPipelines;
+
+  @BeforeEach
+  public void setup() throws Exception {
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+
+    NodeManager mockNodeManager = mock(NodeManager.class);
+    policy = new RoundRobinPipelineChoosePolicy().init(mockNodeManager);
+
+    // 4 pipelines with each pipeline having 3 datanodes
+    //
+    //  pipeline0    dn1   dn2   dn3
+    //  pipeline1    dn0   dn2   dn3
+    //  pipeline2    dn0   dn1   dn3
+    //  pipeline3    dn0   dn1   dn2
+    //
+    allPipelines = new ArrayList<>();
+    for (int i = 0; i < NUM_PIPELINES; i++) {
+      List<DatanodeDetails> dns = new ArrayList<>();
+      for (int j = 0; j < datanodes.size(); j++) {
+        if (i != j) {
+          dns.add(datanodes.get(j));
+        }
+      }
+      Pipeline pipeline = MockPipeline.createPipeline(dns);
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+      allPipelines.add(pipeline);
+    }
+
+  }
+
+  private void verifySelectedCountMap(Map<Pipeline, Integer> selectedCountMap, 
int[] arrExpectCount) {
+    for (int i = 0; i < NUM_PIPELINES; i++) {
+      assertEquals(arrExpectCount[i], 
selectedCountMap.getOrDefault(allPipelines.get(i), 0));
+    }
+  }
+
+  @Test
+  public void testChoosePipeline() {
+    Map<Pipeline, Integer> selectedCountMap = new HashMap<>();
+
+    final int numContainers = 100;
+    for (int i = 0; i < numContainers; i++) {
+      Pipeline pipeline = policy.choosePipeline(allPipelines, null);
+      assertNotNull(pipeline);
+      assertEquals(allPipelines.get(i % NUM_PIPELINES), pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+
+    // Each pipeline would be chosen 100 / 4 = 25 times
+    verifySelectedCountMap(selectedCountMap, new int[] {25, 25, 25, 25});
+  }
+
+  @Test
+  public void testChoosePipelineListVaries() {
+    Map<Pipeline, Integer> selectedCountMap;
+
+    // A pipeline list that holds only a subset of the pipelines for this test
+    List<Pipeline> availablePipelines = new ArrayList<>();
+    int numAvailablePipeline;
+
+    // Case 1. Only pipeline0 is available
+    availablePipelines.add(allPipelines.get(0));
+    numAvailablePipeline = availablePipelines.size();
+    selectedCountMap = new HashMap<>();
+    for (int i = 0; i < 10; i++) {
+      final Pipeline pipeline = policy.choosePipeline(availablePipelines, 
null);
+      assertEquals(allPipelines.get(i % numAvailablePipeline), pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+    // pipeline0 is selected 10 times
+    verifySelectedCountMap(selectedCountMap, new int[] {10, 0, 0, 0});
+
+    // Case 2. pipeline0 and pipeline1 are available
+    availablePipelines.add(allPipelines.get(1));
+    numAvailablePipeline = availablePipelines.size();
+    selectedCountMap = new HashMap<>();
+    for (int i = 0; i < 10; i++) {
+      final Pipeline pipeline = policy.choosePipeline(availablePipelines, 
null);
+      assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline), 
pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+    // pipeline0 and pipeline1 are selected 5 times each
+    verifySelectedCountMap(selectedCountMap, new int[] {5, 5, 0, 0});
+
+    // Case 3. pipeline0, pipeline1 and pipeline2 are available
+    availablePipelines.add(allPipelines.get(2));
+    numAvailablePipeline = availablePipelines.size();
+    selectedCountMap = new HashMap<>();
+    for (int i = 0; i < 10; i++) {
+      final Pipeline pipeline = policy.choosePipeline(availablePipelines, 
null);
+      assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline), 
pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+    // pipeline0-2 are selected 3-4 times each
+    verifySelectedCountMap(selectedCountMap, new int[] {3, 4, 3, 0});
+
+    // Case 4. All 4 pipelines are available
+    availablePipelines.add(allPipelines.get(3));
+    numAvailablePipeline = availablePipelines.size();
+    selectedCountMap = new HashMap<>();
+    for (int i = 0; i < 10; i++) {
+      final Pipeline pipeline = policy.choosePipeline(availablePipelines, 
null);
+      assertEquals(availablePipelines.get((i + 2) % numAvailablePipeline), 
pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+    // pipeline0-3 are selected 2-3 times each
+    verifySelectedCountMap(selectedCountMap, new int[] {2, 2, 3, 3});
+
+    // Case 5. Remove pipeline0 from the available pipeline list
+    availablePipelines.remove(allPipelines.get(0));
+    numAvailablePipeline = availablePipelines.size();
+    selectedCountMap = new HashMap<>();
+    for (int i = 0; i < 10; i++) {
+      final Pipeline pipeline = policy.choosePipeline(availablePipelines, 
null);
+      assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline), 
pipeline);
+      selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+    }
+    // pipeline1-3 are selected 3-4 times each
+    verifySelectedCountMap(selectedCountMap, new int[] {0, 3, 4, 3});
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
index 3e1667a38a..051fef4b94 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RoundRobinPipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.ClientConfigForTesting;
@@ -107,6 +108,7 @@ public class TestOzoneClientRetriesOnExceptions {
     conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
+    conf.set("hdds.scm.pipeline.choose.policy.impl", 
RoundRobinPipelineChoosePolicy.class.getName());
     conf.setQuietMode(false);
 
     ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
@@ -216,7 +218,7 @@ public class TestOzoneClientRetriesOnExceptions {
                 .getPipeline(container.getPipelineID());
         XceiverClientSpi xceiverClient =
             xceiverClientManager.acquireClient(pipeline);
-        Assumptions.assumeFalse(containerList.contains(containerID));
+        assertThat(containerList.contains(containerID));
         containerList.add(containerID);
         xceiverClient.sendCommand(ContainerTestHelper
             .getCreateContainerRequest(containerID, pipeline));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to