This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a34accd12e HDDS-8698. EC: Avoid unbounded pipeline creation if no 
existing pipelines meet criteria (#4829)
a34accd12e is described below

commit a34accd12e5494a5f807a25b2fe77cd19886014f
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jun 5 20:25:35 2023 +0100

    HDDS-8698. EC: Avoid unbounded pipeline creation if no existing pipelines 
meet criteria (#4829)
---
 .../scm/pipeline/WritableECContainerProvider.java  | 18 +++++++++++-----
 .../pipeline/TestWritableECContainerProvider.java  | 24 ++++++++++------------
 2 files changed, 24 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 48594b2faf..47a7a32ab5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -94,11 +94,12 @@ public class WritableECContainerProvider
   public ContainerInfo getContainer(final long size,
       ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
       throws IOException, TimeoutException {
-    int minimumPipelines = getMinimumPipelines(repConfig);
+    int maximumPipelines = getMaximumPipelines(repConfig);
+    int openPipelineCount = 0;
     synchronized (this) {
-      int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
+      openPipelineCount = pipelineManager.getPipelineCount(repConfig,
           Pipeline.PipelineState.OPEN);
-      if (openPipelineCount < minimumPipelines) {
+      if (openPipelineCount < maximumPipelines) {
         try {
           return allocateContainer(repConfig, size, owner, excludeList);
         } catch (IOException e) {
@@ -131,6 +132,7 @@ public class WritableECContainerProvider
               || !containerHasSpace(containerInfo, size)) {
             existingPipelines.remove(pipelineIndex);
             pipelineManager.closePipeline(pipeline, true);
+            openPipelineCount--;
           } else {
             if (containerIsExcluded(containerInfo, excludeList)) {
               existingPipelines.remove(pipelineIndex);
@@ -143,6 +145,7 @@ public class WritableECContainerProvider
               + "container", e);
           existingPipelines.remove(pipelineIndex);
           pipelineManager.closePipeline(pipeline, true);
+          openPipelineCount--;
         }
       }
     }
@@ -150,7 +153,12 @@ public class WritableECContainerProvider
     // allocate a new one.
     try {
       synchronized (this) {
-        return allocateContainer(repConfig, size, owner, excludeList);
+        if (openPipelineCount < maximumPipelines) {
+          return allocateContainer(repConfig, size, owner, excludeList);
+        }
+        throw new IOException("Unable to allocate a pipeline for "
+            + repConfig + " after trying all existing pipelines as the max "
+            + "limit has been reached and no pipelines where closed");
       }
     } catch (IOException e) {
       LOG.error("Unable to allocate a container for {} after trying all "
@@ -159,7 +167,7 @@ public class WritableECContainerProvider
     }
   }
 
-  private int getMinimumPipelines(ECReplicationConfig repConfig) {
+  private int getMaximumPipelines(ECReplicationConfig repConfig) {
     final double factor = providerConfig.getPipelinePerVolumeFactor();
     int volumeBasedCount = 0;
     if (factor > 0) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index f0245c4a0f..76225d07fd 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.verify;
 
@@ -221,7 +222,7 @@ public class TestWritableECContainerProvider {
       allocatedContainers.add(container);
     }
     // We have the min limit of pipelines, but then exclude one. It should use
-    // one of the existing rather than createing a new one, as the limit is
+    // one of the existing rather than creating a new one, as the limit is
     // checked against all pipelines, not just the filtered list
     ExcludeList exclude = new ExcludeList();
     PipelineID excludedID = allocatedContainers
@@ -235,7 +236,7 @@ public class TestWritableECContainerProvider {
 
   @ParameterizedTest
   @MethodSource("policies")
-  public void testNewPipelineCreatedIfAllPipelinesExcluded(
+  public void testNewPipelineNotCreatedIfAllPipelinesExcluded(
       PipelineChoosePolicy policy) throws IOException, TimeoutException {
     provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
@@ -244,20 +245,19 @@ public class TestWritableECContainerProvider {
           1, repConfig, OWNER, new ExcludeList());
       allocatedContainers.add(container);
     }
-    // We have the min limit of pipelines, but then exclude one. It should use
-    // one of the existing rather than creating a new one, as the limit is
-    // checked against all pipelines, not just the filtered list
+    // We have the min limit of pipelines, but then exclude all the associated
+    // containers.
     ExcludeList exclude = new ExcludeList();
     for (ContainerInfo c : allocatedContainers) {
       exclude.addPipeline(c.getPipelineID());
     }
-    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
-    assertFalse(allocatedContainers.contains(c));
+    assertThrows(IOException.class, () -> provider.getContainer(
+        1, repConfig, OWNER, exclude));
   }
 
   @ParameterizedTest
   @MethodSource("policies")
-  public void testNewPipelineCreatedIfAllContainersExcluded(
+  public void testNewPipelineNotCreatedIfAllContainersExcluded(
       PipelineChoosePolicy policy) throws IOException, TimeoutException {
     provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
@@ -266,15 +266,13 @@ public class TestWritableECContainerProvider {
           1, repConfig, OWNER, new ExcludeList());
       allocatedContainers.add(container);
     }
-    // We have the min limit of pipelines, but then exclude one. It should use
-    // one of the existing rather than createing a new one, as the limit is
-    // checked against all pipelines, not just the filtered list
+    // We have the min limit of pipelines, but then exclude them all
     ExcludeList exclude = new ExcludeList();
     for (ContainerInfo c : allocatedContainers) {
       exclude.addConatinerId(c.containerID());
     }
-    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
-    assertFalse(allocatedContainers.contains(c));
+    assertThrows(IOException.class, () -> provider.getContainer(
+        1, repConfig, OWNER, exclude));
   }
 
   @ParameterizedTest


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to