This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a34accd12e HDDS-8698. EC: Avoid unbounded pipeline creation if no
existing pipelines meet criteria (#4829)
a34accd12e is described below
commit a34accd12e5494a5f807a25b2fe77cd19886014f
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jun 5 20:25:35 2023 +0100
HDDS-8698. EC: Avoid unbounded pipeline creation if no existing pipelines
meet criteria (#4829)
---
.../scm/pipeline/WritableECContainerProvider.java | 18 +++++++++++-----
.../pipeline/TestWritableECContainerProvider.java | 24 ++++++++++------------
2 files changed, 24 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 48594b2faf..47a7a32ab5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -94,11 +94,12 @@ public class WritableECContainerProvider
public ContainerInfo getContainer(final long size,
ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
throws IOException, TimeoutException {
- int minimumPipelines = getMinimumPipelines(repConfig);
+ int maximumPipelines = getMaximumPipelines(repConfig);
+ int openPipelineCount = 0;
synchronized (this) {
- int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
+ openPipelineCount = pipelineManager.getPipelineCount(repConfig,
Pipeline.PipelineState.OPEN);
- if (openPipelineCount < minimumPipelines) {
+ if (openPipelineCount < maximumPipelines) {
try {
return allocateContainer(repConfig, size, owner, excludeList);
} catch (IOException e) {
@@ -131,6 +132,7 @@ public class WritableECContainerProvider
|| !containerHasSpace(containerInfo, size)) {
existingPipelines.remove(pipelineIndex);
pipelineManager.closePipeline(pipeline, true);
+ openPipelineCount--;
} else {
if (containerIsExcluded(containerInfo, excludeList)) {
existingPipelines.remove(pipelineIndex);
@@ -143,6 +145,7 @@ public class WritableECContainerProvider
+ "container", e);
existingPipelines.remove(pipelineIndex);
pipelineManager.closePipeline(pipeline, true);
+ openPipelineCount--;
}
}
}
@@ -150,7 +153,12 @@ public class WritableECContainerProvider
// allocate a new one.
try {
synchronized (this) {
- return allocateContainer(repConfig, size, owner, excludeList);
+ if (openPipelineCount < maximumPipelines) {
+ return allocateContainer(repConfig, size, owner, excludeList);
+ }
+ throw new IOException("Unable to allocate a pipeline for "
+ + repConfig + " after trying all existing pipelines as the max "
+ + "limit has been reached and no pipelines where closed");
}
} catch (IOException e) {
LOG.error("Unable to allocate a container for {} after trying all "
@@ -159,7 +167,7 @@ public class WritableECContainerProvider
}
}
- private int getMinimumPipelines(ECReplicationConfig repConfig) {
+ private int getMaximumPipelines(ECReplicationConfig repConfig) {
final double factor = providerConfig.getPipelinePerVolumeFactor();
int volumeBasedCount = 0;
if (factor > 0) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index f0245c4a0f..76225d07fd 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
@@ -221,7 +222,7 @@ public class TestWritableECContainerProvider {
allocatedContainers.add(container);
}
// We have the min limit of pipelines, but then exclude one. It should use
- // one of the existing rather than createing a new one, as the limit is
+ // one of the existing rather than creating a new one, as the limit is
// checked against all pipelines, not just the filtered list
ExcludeList exclude = new ExcludeList();
PipelineID excludedID = allocatedContainers
@@ -235,7 +236,7 @@ public class TestWritableECContainerProvider {
@ParameterizedTest
@MethodSource("policies")
- public void testNewPipelineCreatedIfAllPipelinesExcluded(
+ public void testNewPipelineNotCreatedIfAllPipelinesExcluded(
PipelineChoosePolicy policy) throws IOException, TimeoutException {
provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
@@ -244,20 +245,19 @@ public class TestWritableECContainerProvider {
1, repConfig, OWNER, new ExcludeList());
allocatedContainers.add(container);
}
- // We have the min limit of pipelines, but then exclude one. It should use
- // one of the existing rather than creating a new one, as the limit is
- // checked against all pipelines, not just the filtered list
+ // We have the min limit of pipelines, but then exclude all the associated
+ // containers.
ExcludeList exclude = new ExcludeList();
for (ContainerInfo c : allocatedContainers) {
exclude.addPipeline(c.getPipelineID());
}
- ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
- assertFalse(allocatedContainers.contains(c));
+ assertThrows(IOException.class, () -> provider.getContainer(
+ 1, repConfig, OWNER, exclude));
}
@ParameterizedTest
@MethodSource("policies")
- public void testNewPipelineCreatedIfAllContainersExcluded(
+ public void testNewPipelineNotCreatedIfAllContainersExcluded(
PipelineChoosePolicy policy) throws IOException, TimeoutException {
provider = createSubject(policy);
Set<ContainerInfo> allocatedContainers = new HashSet<>();
@@ -266,15 +266,13 @@ public class TestWritableECContainerProvider {
1, repConfig, OWNER, new ExcludeList());
allocatedContainers.add(container);
}
- // We have the min limit of pipelines, but then exclude one. It should use
- // one of the existing rather than createing a new one, as the limit is
- // checked against all pipelines, not just the filtered list
+ // We have the min limit of pipelines, but then exclude them all
ExcludeList exclude = new ExcludeList();
for (ContainerInfo c : allocatedContainers) {
exclude.addConatinerId(c.containerID());
}
- ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
- assertFalse(allocatedContainers.contains(c));
+ assertThrows(IOException.class, () -> provider.getContainer(
+ 1, repConfig, OWNER, exclude));
}
@ParameterizedTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]