This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 39ed524a58 HDDS-11020. Implement RoundRobinPipelineChoosePolicy (#6832)
39ed524a58 is described below
commit 39ed524a581cbcfe00b34b794b4297b707a4e3b1
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Jun 25 21:46:29 2024 -0700
HDDS-11020. Implement RoundRobinPipelineChoosePolicy (#6832)
---
.../java/org/apache/hadoop/hdds/scm/ScmConfig.java | 60 ++++----
.../algorithms/RoundRobinPipelineChoosePolicy.java | 63 ++++++++
.../TestRoundRobinPipelineChoosePolicy.java | 170 +++++++++++++++++++++
.../rpc/TestOzoneClientRetriesOnExceptions.java | 4 +-
4 files changed, 262 insertions(+), 35 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
index 2fc04e00f2..3ef9317ced 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
@@ -63,51 +63,43 @@ public class ScmConfig extends ReconfigurableConfig {
)
private String action;
+ private static final String
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL =
+ "One of the following values can be used: "
+ + "(1)
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+ + " : chooses a pipeline randomly. "
+ + "(2)
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy"
+ + " : chooses a healthy pipeline randomly. "
+ + "(3)
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy"
+ + " : chooses the pipeline with lower utilization from two random
pipelines. Note that"
+ + " random choose method will be executed twice in this policy."
+ + "(4)
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RoundRobinPipelineChoosePolicy"
+ + " : chooses a pipeline in a round robin fashion. Intended for
troubleshooting and testing purposes only.";
+
+ // hdds.scm.pipeline.choose.policy.impl
@Config(key = "pipeline.choose.policy.impl",
type = ConfigType.STRING,
- defaultValue = "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms" +
- ".RandomPipelineChoosePolicy",
+ defaultValue =
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy",
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
- "The full name of class which implements "
- + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
- + "The class decides which pipeline will be used to find or "
- + "allocate Ratis containers. If not set, "
- + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "RandomPipelineChoosePolicy will be used as default value. "
- + "The following values can be used, "
- + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "RandomPipelineChoosePolicy : random choose one pipeline. "
- + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "HealthyPipelineChoosePolicy : random choose one healthy pipeline.
"
- + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
- + "utilization from the two pipelines. Note that random choose "
- + "method will be executed twice in this policy."
+ "Sets the policy for choosing a pipeline for a Ratis container. The
value should be "
+ + "the full name of a class which implements
org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ + "The class decides which pipeline will be used to find or allocate
Ratis containers. If not set, "
+ +
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+ + " will be used as default value. " +
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL
)
private String pipelineChoosePolicyName;
+ // hdds.scm.ec.pipeline.choose.policy.impl
@Config(key = "ec.pipeline.choose.policy.impl",
type = ConfigType.STRING,
- defaultValue = "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms" +
- ".RandomPipelineChoosePolicy",
+ defaultValue =
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy",
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
- "The full name of class which implements "
- + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
- + "The class decides which pipeline will be used when "
- + "selecting an EC Pipeline. If not set, "
- + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "RandomPipelineChoosePolicy will be used as default value. "
- + "The following values can be used, "
- + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "RandomPipelineChoosePolicy : random choose one pipeline. "
- + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "HealthyPipelineChoosePolicy : random choose one healthy pipeline.
"
- + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
- + "CapacityPipelineChoosePolicy : choose the pipeline with lower "
- + "utilization from the two pipelines. Note that random choose "
- + "method will be executed twice in this policy."
+ "Sets the policy for choosing an EC pipeline. The value should be "
+ + "the full name of a class which implements
org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ + "The class decides which pipeline will be used when selecting an
EC Pipeline. If not set, "
+ +
"org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy"
+ + " will be used as default value. " +
DESCRIPTION_COMMON_CHOICES_OF_PIPELINE_CHOOSE_POLICY_IMPL
)
private String ecPipelineChoosePolicyName;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
new file mode 100644
index 0000000000..0fb1a1243d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RoundRobinPipelineChoosePolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Round-robin choose policy that chooses pipeline in a round-robin fashion.
+ * Only useful for debugging and testing purposes, at least for now.
+ */
+public class RoundRobinPipelineChoosePolicy implements PipelineChoosePolicy {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(RoundRobinPipelineChoosePolicy.class);
+
+ // Stores the index of the next pipeline to be returned.
+ private int nextPipelineIndex = 0;
+
+ @Override
+ public Pipeline choosePipeline(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ return pipelineList.get(choosePipelineIndex(pipelineList, pri));
+ }
+
+ /**
+ * Given a list of pipelines, return the index of the chosen pipeline.
+ * @param pipelineList List of pipelines
+ * @param pri PipelineRequestInformation
+ * @return Index in the list of the chosen pipeline.
+ */
+ @Override
+ public int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ final int numPipelines = pipelineList.size();
+ int chosenIndex;
+ synchronized (this) {
+ nextPipelineIndex = nextPipelineIndex % numPipelines;
+ chosenIndex = nextPipelineIndex++;
+ }
+ LOG.debug("chosenIndex = {}, numPipelines = {}", chosenIndex,
numPipelines);
+ return chosenIndex;
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
new file mode 100644
index 0000000000..2dc7958631
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestRoundRobinPipelineChoosePolicy.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for the round-robin pipeline choose policy.
+ */
+public class TestRoundRobinPipelineChoosePolicy {
+ private static final int NUM_DATANODES = 4;
+ private static final int NUM_PIPELINES = 4;
+ private PipelineChoosePolicy policy;
+ private List<Pipeline> allPipelines;
+
+ @BeforeEach
+ public void setup() throws Exception {
+
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
+
+ NodeManager mockNodeManager = mock(NodeManager.class);
+ policy = new RoundRobinPipelineChoosePolicy().init(mockNodeManager);
+
+ // 4 pipelines with each pipeline having 3 datanodes
+ //
+ // pipeline0 dn1 dn2 dn3
+ // pipeline1 dn0 dn2 dn3
+ // pipeline2 dn0 dn1 dn3
+ // pipeline3 dn0 dn1 dn2
+ //
+ allPipelines = new ArrayList<>();
+ for (int i = 0; i < NUM_PIPELINES; i++) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int j = 0; j < datanodes.size(); j++) {
+ if (i != j) {
+ dns.add(datanodes.get(j));
+ }
+ }
+ Pipeline pipeline = MockPipeline.createPipeline(dns);
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+ allPipelines.add(pipeline);
+ }
+
+ }
+
+ private void verifySelectedCountMap(Map<Pipeline, Integer> selectedCountMap,
int[] arrExpectCount) {
+ for (int i = 0; i < NUM_PIPELINES; i++) {
+ assertEquals(arrExpectCount[i],
selectedCountMap.getOrDefault(allPipelines.get(i), 0));
+ }
+ }
+
+ @Test
+ public void testChoosePipeline() {
+ Map<Pipeline, Integer> selectedCountMap = new HashMap<>();
+
+ final int numContainers = 100;
+ for (int i = 0; i < numContainers; i++) {
+ Pipeline pipeline = policy.choosePipeline(allPipelines, null);
+ assertNotNull(pipeline);
+ assertEquals(allPipelines.get(i % NUM_PIPELINES), pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+
+ // Each pipeline would be chosen 100 / 4 = 25 times
+ verifySelectedCountMap(selectedCountMap, new int[] {25, 25, 25, 25});
+ }
+
+ @Test
+ public void testChoosePipelineListVaries() {
+ Map<Pipeline, Integer> selectedCountMap;
+
+ // A pipeline list that holds only a subset of the pipelines for this test
+ List<Pipeline> availablePipelines = new ArrayList<>();
+ int numAvailablePipeline;
+
+ // Case 1. Only pipeline0 is available
+ availablePipelines.add(allPipelines.get(0));
+ numAvailablePipeline = availablePipelines.size();
+ selectedCountMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ final Pipeline pipeline = policy.choosePipeline(availablePipelines,
null);
+ assertEquals(allPipelines.get(i % numAvailablePipeline), pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+ // pipeline0 is selected 10 times
+ verifySelectedCountMap(selectedCountMap, new int[] {10, 0, 0, 0});
+
+ // Case 2. pipeline0 and pipeline1 are available
+ availablePipelines.add(allPipelines.get(1));
+ numAvailablePipeline = availablePipelines.size();
+ selectedCountMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ final Pipeline pipeline = policy.choosePipeline(availablePipelines,
null);
+ assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline),
pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+ // pipeline0 and pipeline1 are selected 5 times each
+ verifySelectedCountMap(selectedCountMap, new int[] {5, 5, 0, 0});
+
+ // Case 3. pipeline0, pipeline1 and pipeline2 are available
+ availablePipelines.add(allPipelines.get(2));
+ numAvailablePipeline = availablePipelines.size();
+ selectedCountMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ final Pipeline pipeline = policy.choosePipeline(availablePipelines,
null);
+ assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline),
pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+ // pipeline0-2 are selected 3-4 times each
+ verifySelectedCountMap(selectedCountMap, new int[] {3, 4, 3, 0});
+
+ // Case 4. All 4 pipelines are available
+ availablePipelines.add(allPipelines.get(3));
+ numAvailablePipeline = availablePipelines.size();
+ selectedCountMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ final Pipeline pipeline = policy.choosePipeline(availablePipelines,
null);
+ assertEquals(availablePipelines.get((i + 2) % numAvailablePipeline),
pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+ // pipeline0-3 are selected 2-3 times each
+ verifySelectedCountMap(selectedCountMap, new int[] {2, 2, 3, 3});
+
+ // Case 5. Remove pipeline0 from the available pipeline list
+ availablePipelines.remove(allPipelines.get(0));
+ numAvailablePipeline = availablePipelines.size();
+ selectedCountMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ final Pipeline pipeline = policy.choosePipeline(availablePipelines,
null);
+ assertEquals(availablePipelines.get((i + 1) % numAvailablePipeline),
pipeline);
+ selectedCountMap.compute(pipeline, (k, v) -> v == null ? 1 : v + 1);
+ }
+ // pipeline1-3 are selected 3-4 times each
+ verifySelectedCountMap(selectedCountMap, new int[] {0, 3, 4, 3});
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
index 3e1667a38a..051fef4b94 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RoundRobinPipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.ClientConfigForTesting;
@@ -107,6 +108,7 @@ public class TestOzoneClientRetriesOnExceptions {
conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
+ conf.set("hdds.scm.pipeline.choose.policy.impl",
RoundRobinPipelineChoosePolicy.class.getName());
conf.setQuietMode(false);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
@@ -216,7 +218,7 @@ public class TestOzoneClientRetriesOnExceptions {
.getPipeline(container.getPipelineID());
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
- Assumptions.assumeFalse(containerList.contains(containerID));
+ assertThat(containerList.contains(containerID));
containerList.add(containerID);
xceiverClient.sendCommand(ContainerTestHelper
.getCreateContainerRequest(containerID, pipeline));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]