This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1859d72b99 HDDS-8682. EC: Avoid O(n) array.remove(element) when
filtering pipelines in WritableECContainerProvider (#4767)
1859d72b99 is described below
commit 1859d72b99c73ff2daaf8ec713ec8fd6911b555a
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu May 25 23:04:20 2023 +0100
HDDS-8682. EC: Avoid O(n) array.remove(element) when filtering pipelines in
WritableECContainerProvider (#4767)
---
.../hadoop/hdds/scm/PipelineChoosePolicy.java | 14 ++-
.../scm/pipeline/WritableECContainerProvider.java | 19 ++-
.../algorithms/HealthyPipelineChoosePolicy.java | 16 ++-
.../algorithms/RandomPipelineChoosePolicy.java | 15 ++-
.../pipeline/TestWritableECContainerProvider.java | 133 +++++++++++++--------
.../TestPipelineChoosePolicyFactory.java | 6 +
6 files changed, 138 insertions(+), 65 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
index c829e2eab6..76439a7846 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
@@ -30,8 +30,20 @@ public interface PipelineChoosePolicy {
* Given an initial list of pipelines, return one of the pipelines.
*
* @param pipelineList list of pipelines.
- * @return one of the pipelines.
+ * @return one of the pipelines or null if no pipeline can be selected.
*/
Pipeline choosePipeline(List<Pipeline> pipelineList,
PipelineRequestInformation pri);
+
+ /**
+ * Given a list of pipelines, return the index of the chosen pipeline.
+ * @param pipelineList List of pipelines
+ * @param pri PipelineRequestInformation
+ * @return Index in the list of the chosen pipeline, or -1 if no pipeline
+ * could be selected.
+ */
+ default int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ return pipelineList == null || pipelineList.isEmpty() ? -1 : 0;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 48173d1f63..48594b2faf 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -87,7 +87,7 @@ public class WritableECContainerProvider
* @param owner The owner of the container
* @param excludeList A set of datanodes, container and pipelines which
should
* not be considered.
- * @return A containerInfo representing a block group with with space for the
+ * @return A containerInfo representing a block group with space for the
* write, or null if no container can be allocated.
*/
@Override
@@ -116,25 +116,24 @@ public class WritableECContainerProvider
.setSize(size)
.build();
while (existingPipelines.size() > 0) {
- Pipeline pipeline =
- pipelineChoosePolicy.choosePipeline(existingPipelines, pri);
- if (pipeline == null) {
+ int pipelineIndex =
+ pipelineChoosePolicy.choosePipelineIndex(existingPipelines, pri);
+ if (pipelineIndex < 0) {
LOG.warn("Unable to select a pipeline from {} in the list",
existingPipelines.size());
break;
}
+ Pipeline pipeline = existingPipelines.get(pipelineIndex);
synchronized (pipeline.getId()) {
try {
ContainerInfo containerInfo = getContainerFromPipeline(pipeline);
if (containerInfo == null
|| !containerHasSpace(containerInfo, size)) {
- // This is O(n), which isn't great if there are a lot of pipelines
- // and we keep finding pipelines without enough space.
- existingPipelines.remove(pipeline);
+ existingPipelines.remove(pipelineIndex);
pipelineManager.closePipeline(pipeline, true);
} else {
if (containerIsExcluded(containerInfo, excludeList)) {
- existingPipelines.remove(pipeline);
+ existingPipelines.remove(pipelineIndex);
} else {
return containerInfo;
}
@@ -142,13 +141,13 @@ public class WritableECContainerProvider
} catch (PipelineNotFoundException | ContainerNotFoundException e) {
LOG.warn("Pipeline or container not found when selecting a writable "
+ "container", e);
- existingPipelines.remove(pipeline);
+ existingPipelines.remove(pipelineIndex);
pipelineManager.closePipeline(pipeline, true);
}
}
}
// If we get here, all the pipelines we tried were no good. So try to
- // allocate a new one and usePipelineManagerV2Impl.java it.
+ // allocate a new one.
try {
synchronized (this) {
return allocateContainer(repConfig, size, owner, excludeList);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
index 9f77caa682..678627a96e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
@@ -17,23 +17,27 @@
package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import java.util.ArrayList;
import java.util.List;
/**
* The healthy pipeline choose policy that chooses pipeline
* until return healthy pipeline.
*/
-public class HealthyPipelineChoosePolicy extends RandomPipelineChoosePolicy {
+public class HealthyPipelineChoosePolicy implements PipelineChoosePolicy {
+
+ private PipelineChoosePolicy randomPolicy = new RandomPipelineChoosePolicy();
@Override
public Pipeline choosePipeline(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
Pipeline fallback = null;
while (pipelineList.size() > 0) {
- Pipeline pipeline = super.choosePipeline(pipelineList, pri);
+ Pipeline pipeline = randomPolicy.choosePipeline(pipelineList, pri);
if (pipeline.isHealthy()) {
return pipeline;
} else {
@@ -43,4 +47,12 @@ public class HealthyPipelineChoosePolicy extends
RandomPipelineChoosePolicy {
}
return fallback;
}
+
+ @Override
+ public int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+ Pipeline pipeline = choosePipeline(mutableList, pri);
+ return pipelineList.indexOf(pipeline);
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
index 457e391abb..ea6a0ee70e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
@@ -34,6 +34,19 @@ public class RandomPipelineChoosePolicy implements
PipelineChoosePolicy {
@SuppressWarnings("java:S2245") // no need for secure random
public Pipeline choosePipeline(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
- return pipelineList.get((int) (Math.random() * pipelineList.size()));
+ return pipelineList.get(choosePipelineIndex(pipelineList, pri));
+ }
+
+ /**
+ * Given a list of pipelines, return the index of the chosen pipeline.
+ * @param pipelineList List of pipelines
+ * @param pri PipelineRequestInformation
+ * @return Index in the list of the chosen pipeline, or -1 if no pipeline
+ * could be selected.
+ */
+ @Override
+ public int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ return (int) (Math.random() * pipelineList.size());
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 37a91443fc..f0245c4a0f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -37,17 +37,21 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -76,8 +80,6 @@ public class TestWritableECContainerProvider {
private PipelineManager pipelineManager;
private final ContainerManager containerManager
= Mockito.mock(ContainerManager.class);
- private final PipelineChoosePolicy pipelineChoosingPolicy
- = new HealthyPipelineChoosePolicy();
private OzoneConfiguration conf;
private DBStore dbStore;
@@ -89,6 +91,14 @@ public class TestWritableECContainerProvider {
private Map<ContainerID, ContainerInfo> containers;
private WritableECContainerProviderConfig providerConf;
+ @Parameterized.Parameters
+ public static Collection<PipelineChoosePolicy> policies() {
+ Collection<PipelineChoosePolicy> policies = new ArrayList<>();
+ policies.add(new RandomPipelineChoosePolicy());
+ policies.add(new HealthyPipelineChoosePolicy());
+ return policies;
+ }
+
@BeforeEach
public void setup() throws IOException {
repConfig = new ECReplicationConfig(3, 2);
@@ -107,8 +117,6 @@ public class TestWritableECContainerProvider {
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
- provider = createSubject();
-
Mockito.doAnswer(call -> {
Pipeline pipeline = (Pipeline)call.getArguments()[2];
ContainerInfo container = createContainer(pipeline,
@@ -126,20 +134,23 @@ public class TestWritableECContainerProvider {
}
- private WritableContainerProvider<ECReplicationConfig> createSubject() {
- return createSubject(pipelineManager);
+ private WritableContainerProvider<ECReplicationConfig> createSubject(
+ PipelineChoosePolicy policy) {
+ return createSubject(pipelineManager, policy);
}
private WritableContainerProvider<ECReplicationConfig> createSubject(
- PipelineManager customPipelineManager) {
+ PipelineManager customPipelineManager, PipelineChoosePolicy policy) {
return new WritableECContainerProvider(providerConf, getMaxContainerSize(),
nodeManager, customPipelineManager, containerManager,
- pipelineChoosingPolicy);
+ policy);
}
- @Test
- void testPipelinesCreatedBasedOnTotalDiskCount()
+ @ParameterizedTest
+ @MethodSource("policies")
+ void testPipelinesCreatedBasedOnTotalDiskCount(PipelineChoosePolicy policy)
throws IOException, TimeoutException {
+ provider = createSubject(policy);
providerConf.setMinimumPipelines(1);
nodeManager.setNumHealthyVolumes(20);
@@ -149,9 +160,11 @@ public class TestWritableECContainerProvider {
assertReusesExisting(allocated, pipelineLimit);
}
- @Test
- void testPipelinesCreatedBasedOnTotalDiskCountWithFactor()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ void testPipelinesCreatedBasedOnTotalDiskCountWithFactor(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
int factor = 10;
providerConf.setMinimumPipelines(1);
providerConf.setPipelinePerVolumeFactor(factor);
@@ -163,9 +176,11 @@ public class TestWritableECContainerProvider {
assertReusesExisting(allocated, pipelineLimit);
}
- @Test
- void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
int minimumPipelines = providerConf.getMinimumPipelines();
Set<ContainerInfo> allocated = assertDistinctContainers(minimumPipelines);
assertReusesExisting(allocated, minimumPipelines);
@@ -194,9 +209,11 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testPiplineLimitIgnoresExcludedPipelines()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testPiplineLimitIgnoresExcludedPipelines(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
@@ -216,9 +233,11 @@ public class TestWritableECContainerProvider {
assertTrue(allocatedContainers.contains(c));
}
- @Test
- public void testNewPipelineCreatedIfAllPipelinesExcluded()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testNewPipelineCreatedIfAllPipelinesExcluded(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
@@ -236,9 +255,11 @@ public class TestWritableECContainerProvider {
assertFalse(allocatedContainers.contains(c));
}
- @Test
- public void testNewPipelineCreatedIfAllContainersExcluded()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testNewPipelineCreatedIfAllContainersExcluded(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
@@ -256,9 +277,10 @@ public class TestWritableECContainerProvider {
assertFalse(allocatedContainers.contains(c));
}
- @Test
- public void testUnableToCreateAnyPipelinesThrowsException()
- throws IOException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testUnableToCreateAnyPipelinesThrowsException(
+ PipelineChoosePolicy policy) throws IOException {
pipelineManager = new MockPipelineManager(
dbStore, scmhaManager, nodeManager) {
@Override
@@ -268,7 +290,7 @@ public class TestWritableECContainerProvider {
throw new IOException("Cannot create pipelines");
}
};
- provider = createSubject();
+ provider = createSubject(policy);
try {
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -278,9 +300,10 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testExistingPipelineReturnedWhenNewCannotBeCreated()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testExistingPipelineReturnedWhenNewCannotBeCreated(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
pipelineManager = new MockPipelineManager(
dbStore, scmhaManager, nodeManager) {
@@ -298,7 +321,7 @@ public class TestWritableECContainerProvider {
return super.createPipeline(repConfig);
}
};
- provider = createSubject();
+ provider = createSubject(policy);
try {
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -317,9 +340,11 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers =
assertDistinctContainers(providerConf.getMinimumPipelines());
// Update all the containers to make them nearly full, but with enough
space
@@ -349,9 +374,10 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testPipelineNotFoundWhenAttemptingToUseExisting()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testPipelineNotFoundWhenAttemptingToUseExisting(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
// Ensure PM throws PNF exception when we ask for the containers in the
// pipeline
pipelineManager = new MockPipelineManager(
@@ -363,7 +389,7 @@ public class TestWritableECContainerProvider {
throw new PipelineNotFoundException("Simulated exception");
}
};
- provider = createSubject();
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers =
assertDistinctContainers(providerConf.getMinimumPipelines());
@@ -376,9 +402,11 @@ public class TestWritableECContainerProvider {
assertFalse(allocatedContainers.contains(newContainer));
}
- @Test
- public void testContainerNotFoundWhenAttemptingToUseExisting()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testContainerNotFoundWhenAttemptingToUseExisting(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers =
assertDistinctContainers(providerConf.getMinimumPipelines());
@@ -400,12 +428,14 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testPipelineOpenButContainerRemovedFromIt()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testPipelineOpenButContainerRemovedFromIt(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
// This can happen if the container close process is triggered from the DN.
// When tha happens, CM will change the container state to CLOSING and
// remove it from the container list in pipeline Manager.
+ provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
@@ -425,11 +455,12 @@ public class TestWritableECContainerProvider {
}
}
- @Test
- public void testExcludedNodesPassedToCreatePipelineIfProvided()
- throws IOException, TimeoutException {
+ @ParameterizedTest
+ @MethodSource("policies")
+ public void testExcludedNodesPassedToCreatePipelineIfProvided(
+ PipelineChoosePolicy policy) throws IOException, TimeoutException {
PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
- provider = createSubject(pipelineManagerSpy);
+ provider = createSubject(pipelineManagerSpy, policy);
ExcludeList excludeList = new ExcludeList();
// EmptyList should be passed if there are no nodes excluded.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
index 17aec52137..53bd2f0044 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
@@ -71,6 +71,12 @@ public class TestPipelineChoosePolicyFactory {
PipelineRequestInformation pri) {
return null;
}
+
+ @Override
+ public int choosePipelineIndex(List<Pipeline> pipelineList,
+ PipelineRequestInformation pri) {
+ return -1;
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]