This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1859d72b99 HDDS-8682. EC: Avoid O(n) array.remove(element) when 
filtering pipelines in WritableECContainerProvider (#4767)
1859d72b99 is described below

commit 1859d72b99c73ff2daaf8ec713ec8fd6911b555a
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu May 25 23:04:20 2023 +0100

    HDDS-8682. EC: Avoid O(n) array.remove(element) when filtering pipelines in 
WritableECContainerProvider (#4767)
---
 .../hadoop/hdds/scm/PipelineChoosePolicy.java      |  14 ++-
 .../scm/pipeline/WritableECContainerProvider.java  |  19 ++-
 .../algorithms/HealthyPipelineChoosePolicy.java    |  16 ++-
 .../algorithms/RandomPipelineChoosePolicy.java     |  15 ++-
 .../pipeline/TestWritableECContainerProvider.java  | 133 +++++++++++++--------
 .../TestPipelineChoosePolicyFactory.java           |   6 +
 6 files changed, 138 insertions(+), 65 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
index c829e2eab6..76439a7846 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java
@@ -30,8 +30,20 @@ public interface PipelineChoosePolicy {
    * Given an initial list of pipelines, return one of the pipelines.
    *
    * @param pipelineList list of pipelines.
-   * @return one of the pipelines.
+   * @return one of the pipelines or null if no pipeline can be selected.
    */
   Pipeline choosePipeline(List<Pipeline> pipelineList,
       PipelineRequestInformation pri);
+
+  /**
+   * Given a list of pipelines, return the index of the chosen pipeline.
+   * @param pipelineList List of pipelines
+   * @param pri          PipelineRequestInformation
+   * @return Index in the list of the chosen pipeline, or -1 if no pipeline
+   *         could be selected.
+   */
+  default int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    return pipelineList == null || pipelineList.isEmpty() ? -1 : 0;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 48173d1f63..48594b2faf 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -87,7 +87,7 @@ public class WritableECContainerProvider
    * @param owner The owner of the container
    * @param excludeList A set of datanodes, container and pipelines which 
should
    *                    not be considered.
-   * @return A containerInfo representing a block group with with space for the
+   * @return A containerInfo representing a block group with space for the
    *         write, or null if no container can be allocated.
    */
   @Override
@@ -116,25 +116,24 @@ public class WritableECContainerProvider
             .setSize(size)
             .build();
     while (existingPipelines.size() > 0) {
-      Pipeline pipeline =
-          pipelineChoosePolicy.choosePipeline(existingPipelines, pri);
-      if (pipeline == null) {
+      int pipelineIndex =
+          pipelineChoosePolicy.choosePipelineIndex(existingPipelines, pri);
+      if (pipelineIndex < 0) {
         LOG.warn("Unable to select a pipeline from {} in the list",
             existingPipelines.size());
         break;
       }
+      Pipeline pipeline = existingPipelines.get(pipelineIndex);
       synchronized (pipeline.getId()) {
         try {
           ContainerInfo containerInfo = getContainerFromPipeline(pipeline);
           if (containerInfo == null
               || !containerHasSpace(containerInfo, size)) {
-            // This is O(n), which isn't great if there are a lot of pipelines
-            // and we keep finding pipelines without enough space.
-            existingPipelines.remove(pipeline);
+            existingPipelines.remove(pipelineIndex);
             pipelineManager.closePipeline(pipeline, true);
           } else {
             if (containerIsExcluded(containerInfo, excludeList)) {
-              existingPipelines.remove(pipeline);
+              existingPipelines.remove(pipelineIndex);
             } else {
               return containerInfo;
             }
@@ -142,13 +141,13 @@ public class WritableECContainerProvider
         } catch (PipelineNotFoundException | ContainerNotFoundException e) {
           LOG.warn("Pipeline or container not found when selecting a writable "
               + "container", e);
-          existingPipelines.remove(pipeline);
+          existingPipelines.remove(pipelineIndex);
           pipelineManager.closePipeline(pipeline, true);
         }
       }
     }
     // If we get here, all the pipelines we tried were no good. So try to
-    // allocate a new one and usePipelineManagerV2Impl.java it.
+    // allocate a new one.
     try {
       synchronized (this) {
         return allocateContainer(repConfig, size, owner, excludeList);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
index 9f77caa682..678627a96e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java
@@ -17,23 +17,27 @@
 
 package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
 
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
  * The healthy pipeline choose policy that chooses pipeline
  * until return healthy pipeline.
  */
-public class HealthyPipelineChoosePolicy extends RandomPipelineChoosePolicy {
+public class HealthyPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private PipelineChoosePolicy randomPolicy = new RandomPipelineChoosePolicy();
 
   @Override
   public Pipeline choosePipeline(List<Pipeline> pipelineList,
       PipelineRequestInformation pri) {
     Pipeline fallback = null;
     while (pipelineList.size() > 0) {
-      Pipeline pipeline = super.choosePipeline(pipelineList, pri);
+      Pipeline pipeline = randomPolicy.choosePipeline(pipelineList, pri);
       if (pipeline.isHealthy()) {
         return pipeline;
       } else {
@@ -43,4 +47,12 @@ public class HealthyPipelineChoosePolicy extends 
RandomPipelineChoosePolicy {
     }
     return fallback;
   }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
index 457e391abb..ea6a0ee70e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java
@@ -34,6 +34,19 @@ public class RandomPipelineChoosePolicy implements 
PipelineChoosePolicy {
   @SuppressWarnings("java:S2245") // no need for secure random
   public Pipeline choosePipeline(List<Pipeline> pipelineList,
       PipelineRequestInformation pri) {
-    return pipelineList.get((int) (Math.random() * pipelineList.size()));
+    return pipelineList.get(choosePipelineIndex(pipelineList, pri));
+  }
+
+  /**
+   * Given a list of pipelines, return the index of the chosen pipeline.
+   * @param pipelineList List of pipelines
+   * @param pri          PipelineRequestInformation
+   * @return Index in the list of the chosen pipeline, or -1 if no pipeline
+   *         could be selected.
+   */
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    return (int) (Math.random() * pipelineList.size());
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 37a91443fc..f0245c4a0f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -37,17 +37,21 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import 
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -76,8 +80,6 @@ public class TestWritableECContainerProvider {
   private PipelineManager pipelineManager;
   private final ContainerManager containerManager
       = Mockito.mock(ContainerManager.class);
-  private final PipelineChoosePolicy pipelineChoosingPolicy
-      = new HealthyPipelineChoosePolicy();
 
   private OzoneConfiguration conf;
   private DBStore dbStore;
@@ -89,6 +91,14 @@ public class TestWritableECContainerProvider {
   private Map<ContainerID, ContainerInfo> containers;
   private WritableECContainerProviderConfig providerConf;
 
+  @Parameterized.Parameters
+  public static Collection<PipelineChoosePolicy> policies() {
+    Collection<PipelineChoosePolicy> policies = new ArrayList<>();
+    policies.add(new RandomPipelineChoosePolicy());
+    policies.add(new HealthyPipelineChoosePolicy());
+    return policies;
+  }
+
   @BeforeEach
   public void setup() throws IOException {
     repConfig = new ECReplicationConfig(3, 2);
@@ -107,8 +117,6 @@ public class TestWritableECContainerProvider {
     pipelineManager =
         new MockPipelineManager(dbStore, scmhaManager, nodeManager);
 
-    provider = createSubject();
-
     Mockito.doAnswer(call -> {
       Pipeline pipeline = (Pipeline)call.getArguments()[2];
       ContainerInfo container = createContainer(pipeline,
@@ -126,20 +134,23 @@ public class TestWritableECContainerProvider {
 
   }
 
-  private WritableContainerProvider<ECReplicationConfig> createSubject() {
-    return createSubject(pipelineManager);
+  private WritableContainerProvider<ECReplicationConfig> createSubject(
+      PipelineChoosePolicy policy) {
+    return createSubject(pipelineManager, policy);
   }
 
   private WritableContainerProvider<ECReplicationConfig> createSubject(
-      PipelineManager customPipelineManager) {
+      PipelineManager customPipelineManager, PipelineChoosePolicy policy) {
     return new WritableECContainerProvider(providerConf, getMaxContainerSize(),
         nodeManager, customPipelineManager, containerManager,
-        pipelineChoosingPolicy);
+        policy);
   }
 
-  @Test
-  void testPipelinesCreatedBasedOnTotalDiskCount()
+  @ParameterizedTest
+  @MethodSource("policies")
+  void testPipelinesCreatedBasedOnTotalDiskCount(PipelineChoosePolicy policy)
       throws IOException, TimeoutException {
+    provider = createSubject(policy);
     providerConf.setMinimumPipelines(1);
     nodeManager.setNumHealthyVolumes(20);
 
@@ -149,9 +160,11 @@ public class TestWritableECContainerProvider {
     assertReusesExisting(allocated, pipelineLimit);
   }
 
-  @Test
-  void testPipelinesCreatedBasedOnTotalDiskCountWithFactor()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  void testPipelinesCreatedBasedOnTotalDiskCountWithFactor(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     int factor = 10;
     providerConf.setMinimumPipelines(1);
     providerConf.setPipelinePerVolumeFactor(factor);
@@ -163,9 +176,11 @@ public class TestWritableECContainerProvider {
     assertReusesExisting(allocated, pipelineLimit);
   }
 
-  @Test
-  void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     int minimumPipelines = providerConf.getMinimumPipelines();
     Set<ContainerInfo> allocated = assertDistinctContainers(minimumPipelines);
     assertReusesExisting(allocated, minimumPipelines);
@@ -194,9 +209,11 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testPiplineLimitIgnoresExcludedPipelines()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testPiplineLimitIgnoresExcludedPipelines(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
     for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
@@ -216,9 +233,11 @@ public class TestWritableECContainerProvider {
     assertTrue(allocatedContainers.contains(c));
   }
 
-  @Test
-  public void testNewPipelineCreatedIfAllPipelinesExcluded()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testNewPipelineCreatedIfAllPipelinesExcluded(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
     for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
@@ -236,9 +255,11 @@ public class TestWritableECContainerProvider {
     assertFalse(allocatedContainers.contains(c));
   }
 
-  @Test
-  public void testNewPipelineCreatedIfAllContainersExcluded()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testNewPipelineCreatedIfAllContainersExcluded(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
     for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
@@ -256,9 +277,10 @@ public class TestWritableECContainerProvider {
     assertFalse(allocatedContainers.contains(c));
   }
 
-  @Test
-  public void testUnableToCreateAnyPipelinesThrowsException()
-      throws IOException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testUnableToCreateAnyPipelinesThrowsException(
+      PipelineChoosePolicy policy) throws IOException {
     pipelineManager = new MockPipelineManager(
         dbStore, scmhaManager, nodeManager) {
       @Override
@@ -268,7 +290,7 @@ public class TestWritableECContainerProvider {
         throw new IOException("Cannot create pipelines");
       }
     };
-    provider = createSubject();
+    provider = createSubject(policy);
 
     try {
       provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -278,9 +300,10 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testExistingPipelineReturnedWhenNewCannotBeCreated(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
     pipelineManager = new MockPipelineManager(
         dbStore, scmhaManager, nodeManager) {
 
@@ -298,7 +321,7 @@ public class TestWritableECContainerProvider {
         return super.createPipeline(repConfig);
       }
     };
-    provider = createSubject();
+    provider = createSubject(policy);
 
     try {
       provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -317,9 +340,11 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers =
         assertDistinctContainers(providerConf.getMinimumPipelines());
     // Update all the containers to make them nearly full, but with enough 
space
@@ -349,9 +374,10 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testPipelineNotFoundWhenAttemptingToUseExisting()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testPipelineNotFoundWhenAttemptingToUseExisting(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
     // Ensure PM throws PNF exception when we ask for the containers in the
     // pipeline
     pipelineManager = new MockPipelineManager(
@@ -363,7 +389,7 @@ public class TestWritableECContainerProvider {
         throw new PipelineNotFoundException("Simulated exception");
       }
     };
-    provider = createSubject();
+    provider = createSubject(policy);
 
     Set<ContainerInfo> allocatedContainers =
         assertDistinctContainers(providerConf.getMinimumPipelines());
@@ -376,9 +402,11 @@ public class TestWritableECContainerProvider {
     assertFalse(allocatedContainers.contains(newContainer));
   }
 
-  @Test
-  public void testContainerNotFoundWhenAttemptingToUseExisting()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testContainerNotFoundWhenAttemptingToUseExisting(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers =
         assertDistinctContainers(providerConf.getMinimumPipelines());
 
@@ -400,12 +428,14 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testPipelineOpenButContainerRemovedFromIt()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testPipelineOpenButContainerRemovedFromIt(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
     // This can happen if the container close process is triggered from the DN.
     // When tha happens, CM will change the container state to CLOSING and
     // remove it from the container list in pipeline Manager.
+    provider = createSubject(policy);
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
     for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
@@ -425,11 +455,12 @@ public class TestWritableECContainerProvider {
     }
   }
 
-  @Test
-  public void testExcludedNodesPassedToCreatePipelineIfProvided()
-      throws IOException, TimeoutException {
+  @ParameterizedTest
+  @MethodSource("policies")
+  public void testExcludedNodesPassedToCreatePipelineIfProvided(
+      PipelineChoosePolicy policy) throws IOException, TimeoutException {
     PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
-    provider = createSubject(pipelineManagerSpy);
+    provider = createSubject(pipelineManagerSpy, policy);
     ExcludeList excludeList = new ExcludeList();
 
     // EmptyList should be passed if there are no nodes excluded.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
index 17aec52137..53bd2f0044 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java
@@ -71,6 +71,12 @@ public class TestPipelineChoosePolicyFactory {
         PipelineRequestInformation pri) {
       return null;
     }
+
+    @Override
+    public int choosePipelineIndex(List<Pipeline> pipelineList,
+        PipelineRequestInformation pri) {
+      return -1;
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to