This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a393a4d4 HDDS-8674. Allow more EC pipelines based on number of 
volumes (#4758)
54a393a4d4 is described below

commit 54a393a4d4718f2ffd7972644641a972342e66d7
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed May 24 11:32:39 2023 +0200

    HDDS-8674. Allow more EC pipelines based on number of volumes (#4758)
---
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   2 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   9 ++
 .../scm/pipeline/WritableContainerFactory.java     |  25 +++-
 .../scm/pipeline/WritableECContainerProvider.java  |  81 ++++++++---
 .../hadoop/hdds/scm/container/MockNodeManager.java |   5 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |   5 +
 .../pipeline/TestWritableECContainerProvider.java  | 156 ++++++++++++---------
 .../testutils/ReplicationNodeManagerMock.java      |   5 +
 8 files changed, 194 insertions(+), 94 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 3601c3230d..ac38166fc6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -383,6 +383,8 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
 
   int minHealthyVolumeNum(List <DatanodeDetails> dnList);
 
+  int totalHealthyVolumeCount();
+
   int pipelineLimit(DatanodeDetails dn);
 
   int minPipelineLimit(List<DatanodeDetails> dn);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 9141a9b442..edf226f437 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -1132,6 +1132,15 @@ public class SCMNodeManager implements NodeManager {
     return Collections.min(volumeCountList);
   }
 
+  @Override
+  public int totalHealthyVolumeCount() {
+    int sum = 0;
+    for (DatanodeInfo dn : nodeStateManager.getNodes(IN_SERVICE, HEALTHY)) {
+      sum += dn.getHealthyVolumeCount();
+    }
+    return sum;
+  }
+
   /**
    * Returns the pipeline limit for the datanode.
    * if the datanode pipeline limit is set, consider that as the max
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
index 4c2fe4d8c5..b8eae8b6d1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -20,13 +20,19 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import 
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+
 /**
  * Factory class to obtain a container to which a block can be allocated for
  * write.
@@ -38,12 +44,19 @@ public class WritableContainerFactory {
   private final WritableContainerProvider<ECReplicationConfig> ecProvider;
 
   public WritableContainerFactory(StorageContainerManager scm) {
+    ConfigurationSource conf = scm.getConfiguration();
+
     this.ratisProvider = new WritableRatisContainerProvider(
-        scm.getConfiguration(), scm.getPipelineManager(),
+        conf, scm.getPipelineManager(),
         scm.getContainerManager(), scm.getPipelineChoosePolicy());
     this.standaloneProvider = ratisProvider;
-    this.ecProvider = new WritableECContainerProvider(scm.getConfiguration(),
-        scm.getPipelineManager(), scm.getContainerManager(),
+
+    this.ecProvider = new WritableECContainerProvider(
+        conf.getObject(WritableECContainerProviderConfig.class),
+        getConfiguredContainerSize(conf),
+        scm.getScmNodeManager(),
+        scm.getPipelineManager(),
+        scm.getContainerManager(),
         scm.getPipelineChoosePolicy());
   }
 
@@ -64,4 +77,10 @@ public class WritableContainerFactory {
           + " is an invalid replication type");
     }
   }
+
+  private long getConfiguredContainerSize(ConfigurationSource conf) {
+    return (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 7833f150f4..48173d1f63 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -24,16 +24,16 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.PostConstruct;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +44,7 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 
 /**
  * Writable Container provider to obtain a writable container for EC pipelines.
@@ -55,23 +55,25 @@ public class WritableECContainerProvider
   private static final Logger LOG = LoggerFactory
       .getLogger(WritableECContainerProvider.class);
 
-  private final ConfigurationSource conf;
+  private final NodeManager nodeManager;
   private final PipelineManager pipelineManager;
   private final PipelineChoosePolicy pipelineChoosePolicy;
   private final ContainerManager containerManager;
   private final long containerSize;
   private final WritableECContainerProviderConfig providerConfig;
 
-  public WritableECContainerProvider(ConfigurationSource conf,
-      PipelineManager pipelineManager, ContainerManager containerManager,
+  public WritableECContainerProvider(WritableECContainerProviderConfig config,
+      long containerSize,
+      NodeManager nodeManager,
+      PipelineManager pipelineManager,
+      ContainerManager containerManager,
       PipelineChoosePolicy pipelineChoosePolicy) {
-    this.conf = conf;
-    this.providerConfig =
-        conf.getObject(WritableECContainerProviderConfig.class);
+    this.providerConfig = config;
+    this.nodeManager = nodeManager;
     this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
     this.pipelineChoosePolicy = pipelineChoosePolicy;
-    this.containerSize = getConfiguredContainerSize();
+    this.containerSize = containerSize;
   }
 
   /**
@@ -87,19 +89,18 @@ public class WritableECContainerProvider
    *                    not be considered.
    * @return A containerInfo representing a block group with with space for the
    *         write, or null if no container can be allocated.
-   * @throws IOException
    */
   @Override
   public ContainerInfo getContainer(final long size,
       ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
       throws IOException, TimeoutException {
+    int minimumPipelines = getMinimumPipelines(repConfig);
     synchronized (this) {
       int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
           Pipeline.PipelineState.OPEN);
-      if (openPipelineCount < providerConfig.getMinimumPipelines()) {
+      if (openPipelineCount < minimumPipelines) {
         try {
-          return allocateContainer(
-              repConfig, size, owner, excludeList);
+          return allocateContainer(repConfig, size, owner, excludeList);
         } catch (IOException e) {
           LOG.warn("Unable to allocate a container for {} with {} existing "
               + "containers", repConfig, openPipelineCount, e);
@@ -159,6 +160,16 @@ public class WritableECContainerProvider
     }
   }
 
+  private int getMinimumPipelines(ECReplicationConfig repConfig) {
+    final double factor = providerConfig.getPipelinePerVolumeFactor();
+    int volumeBasedCount = 0;
+    if (factor > 0) {
+      int volumes = nodeManager.totalHealthyVolumeCount();
+      volumeBasedCount = (int) factor * volumes / repConfig.getRequiredNodes();
+    }
+    return Math.max(volumeBasedCount, providerConfig.getMinimumPipelines());
+  }
+
   private ContainerInfo allocateContainer(ReplicationConfig repConfig,
       long size, String owner, ExcludeList excludeList)
       throws IOException, TimeoutException {
@@ -205,18 +216,14 @@ public class WritableECContainerProvider
     return container.getUsedBytes() + size <= containerSize;
   }
 
-  private long getConfiguredContainerSize() {
-    return (long) conf.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
-  }
-
   /**
    * Class to hold configuration for WriteableECContainerProvider.
    */
-  @ConfigGroup(prefix = "ozone.scm.ec")
+  @ConfigGroup(prefix = WritableECContainerProviderConfig.PREFIX)
   public static class WritableECContainerProviderConfig {
 
+    private static final String PREFIX = "ozone.scm.ec";
+
     @Config(key = "pipeline.minimum",
         defaultValue = "5",
         type = ConfigType.INT,
@@ -233,6 +240,38 @@ public class WritableECContainerProvider
       this.minimumPipelines = minPipelines;
     }
 
+    private static final String PIPELINE_PER_VOLUME_FACTOR_KEY =
+        "pipeline.per.volume.factor";
+    private static final double PIPELINE_PER_VOLUME_FACTOR_DEFAULT = 1;
+    private static final String PIPELINE_PER_VOLUME_FACTOR_DEFAULT_VALUE = "1";
+    private static final String EC_PIPELINE_PER_VOLUME_FACTOR_KEY =
+        PREFIX + "." + PIPELINE_PER_VOLUME_FACTOR_KEY;
+
+    @Config(key = PIPELINE_PER_VOLUME_FACTOR_KEY,
+        type = ConfigType.DOUBLE,
+        defaultValue = PIPELINE_PER_VOLUME_FACTOR_DEFAULT_VALUE,
+        tags = {SCM},
+        description = "TODO"
+    )
+    private double pipelinePerVolumeFactor = 
PIPELINE_PER_VOLUME_FACTOR_DEFAULT;
+
+    public double getPipelinePerVolumeFactor() {
+      return pipelinePerVolumeFactor;
+    }
+
+    @PostConstruct
+    public void validate() {
+      if (pipelinePerVolumeFactor < 0) {
+        LOG.warn("{} must be non-negative, but was {}. Defaulting to {}",
+            EC_PIPELINE_PER_VOLUME_FACTOR_KEY, pipelinePerVolumeFactor,
+            PIPELINE_PER_VOLUME_FACTOR_DEFAULT);
+        pipelinePerVolumeFactor = PIPELINE_PER_VOLUME_FACTOR_DEFAULT;
+      }
+    }
+
+    public void setPipelinePerVolumeFactor(double v) {
+      pipelinePerVolumeFactor = v;
+    }
   }
 
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index a2d4f09583..ae37161f36 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -901,6 +901,11 @@ public class MockNodeManager implements NodeManager {
     return numHealthyDisksPerDatanode;
   }
 
+  @Override
+  public int totalHealthyVolumeCount() {
+    return healthyNodes.size() * numHealthyDisksPerDatanode;
+  }
+
   @Override
   public int pipelineLimit(DatanodeDetails dn) {
     // by default 1 single node pipeline and 1 three node pipeline
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 76f5f56a68..8c3d027d1d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -366,6 +366,11 @@ public class SimpleMockNodeManager implements NodeManager {
     return 0;
   }
 
+  @Override
+  public int totalHealthyVolumeCount() {
+    return 0;
+  }
+
   @Override
   public int pipelineLimit(DatanodeDetails dn) {
     return 1;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 3296188707..37a91443fc 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -35,7 +35,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
+import 
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import 
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
@@ -43,10 +43,7 @@ import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,7 +60,6 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
 import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -76,33 +72,30 @@ import static org.mockito.Mockito.verify;
  */
 public class TestWritableECContainerProvider {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestWritableECContainerProvider.class);
   private static final String OWNER = "SCM";
   private PipelineManager pipelineManager;
-  private ContainerManager containerManager
+  private final ContainerManager containerManager
       = Mockito.mock(ContainerManager.class);
-  private PipelineChoosePolicy pipelineChoosingPolicy
+  private final PipelineChoosePolicy pipelineChoosingPolicy
       = new HealthyPipelineChoosePolicy();
 
   private OzoneConfiguration conf;
   private DBStore dbStore;
   private SCMHAManager scmhaManager;
-  private NodeManager nodeManager;
-  private WritableContainerProvider provider;
-  private ReplicationConfig repConfig;
-  private int minPipelines;
+  private MockNodeManager nodeManager;
+  private WritableContainerProvider<ECReplicationConfig> provider;
+  private ECReplicationConfig repConfig;
 
   private Map<ContainerID, ContainerInfo> containers;
+  private WritableECContainerProviderConfig providerConf;
 
   @BeforeEach
   public void setup() throws IOException {
     repConfig = new ECReplicationConfig(3, 2);
     conf = new OzoneConfiguration();
-    WritableECContainerProvider.WritableECContainerProviderConfig providerConf 
=
-        conf.getObject(WritableECContainerProvider
-            .WritableECContainerProviderConfig.class);
-    minPipelines = providerConf.getMinimumPipelines();
+
+    providerConf = conf.getObject(WritableECContainerProviderConfig.class);
+
     containers = new HashMap<>();
     File testDir = GenericTestUtils.getTestDir(
         TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
@@ -113,8 +106,8 @@ public class TestWritableECContainerProvider {
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
         new MockPipelineManager(dbStore, scmhaManager, nodeManager);
-    provider = new WritableECContainerProvider(
-        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    provider = createSubject();
 
     Mockito.doAnswer(call -> {
       Pipeline pipeline = (Pipeline)call.getArguments()[2];
@@ -124,47 +117,88 @@ public class TestWritableECContainerProvider {
           pipeline.getId(), container.containerID());
       containers.put(container.containerID(), container);
       return container;
-    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
-        Matchers.anyString(), Matchers.any(Pipeline.class));
+    }).when(containerManager).getMatchingContainer(Mockito.anyLong(),
+        Mockito.anyString(), Mockito.any(Pipeline.class));
 
     Mockito.doAnswer(call ->
         containers.get((ContainerID)call.getArguments()[0]))
-        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+        .when(containerManager).getContainer(Mockito.any(ContainerID.class));
 
   }
 
+  private WritableContainerProvider<ECReplicationConfig> createSubject() {
+    return createSubject(pipelineManager);
+  }
+
+  private WritableContainerProvider<ECReplicationConfig> createSubject(
+      PipelineManager customPipelineManager) {
+    return new WritableECContainerProvider(providerConf, getMaxContainerSize(),
+        nodeManager, customPipelineManager, containerManager,
+        pipelineChoosingPolicy);
+  }
+
   @Test
-  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+  void testPipelinesCreatedBasedOnTotalDiskCount()
+      throws IOException, TimeoutException {
+    providerConf.setMinimumPipelines(1);
+    nodeManager.setNumHealthyVolumes(20);
+
+    int volumeCount = nodeManager.totalHealthyVolumeCount();
+    int pipelineLimit = volumeCount / repConfig.getRequiredNodes();
+    Set<ContainerInfo> allocated = assertDistinctContainers(pipelineLimit);
+    assertReusesExisting(allocated, pipelineLimit);
+  }
+
+  @Test
+  void testPipelinesCreatedBasedOnTotalDiskCountWithFactor()
+      throws IOException, TimeoutException {
+    int factor = 10;
+    providerConf.setMinimumPipelines(1);
+    providerConf.setPipelinePerVolumeFactor(factor);
+    nodeManager.setNumHealthyVolumes(5);
+
+    int volumeCount = nodeManager.totalHealthyVolumeCount();
+    int pipelineLimit = factor * volumeCount / repConfig.getRequiredNodes();
+    Set<ContainerInfo> allocated = assertDistinctContainers(pipelineLimit);
+    assertReusesExisting(allocated, pipelineLimit);
+  }
+
+  @Test
+  void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+      throws IOException, TimeoutException {
+    int minimumPipelines = providerConf.getMinimumPipelines();
+    Set<ContainerInfo> allocated = assertDistinctContainers(minimumPipelines);
+    assertReusesExisting(allocated, minimumPipelines);
+  }
+
+  private Set<ContainerInfo> assertDistinctContainers(int n)
       throws IOException, TimeoutException {
-    // The first 5 calls should return a different container
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
+    for (int i = 0; i < n; i++) {
       ContainerInfo container =
           provider.getContainer(1, repConfig, OWNER, new ExcludeList());
-      assertFalse(allocatedContainers.contains(container));
+      assertFalse(allocatedContainers.contains(container),
+          "Provided existing container for request " + i);
       allocatedContainers.add(container);
     }
+    return allocatedContainers;
+  }
 
-    allocatedContainers.clear();
-    for (int i = 0; i < 20; i++) {
+  private void assertReusesExisting(Set<ContainerInfo> existing, int n)
+      throws IOException, TimeoutException {
+    for (int i = 0; i < 3 * n; i++) {
       ContainerInfo container =
           provider.getContainer(1, repConfig, OWNER, new ExcludeList());
-      allocatedContainers.add(container);
+      assertTrue(existing.contains(container),
+          "Provided new container for request " + i);
     }
-    // Should have minPipelines containers created
-    assertEquals(minPipelines,
-        pipelineManager.getPipelines(repConfig, OPEN).size());
-    // We should have more than 1 allocatedContainers in the set proving a
-    // random container is selected each time. Do not check for 5 here as there
-    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
-    assertTrue(allocatedContainers.size() > 2);
   }
 
   @Test
   public void testPiplineLimitIgnoresExcludedPipelines()
       throws IOException, TimeoutException {
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
+    for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
           1, repConfig, OWNER, new ExcludeList());
       allocatedContainers.add(container);
@@ -186,7 +220,7 @@ public class TestWritableECContainerProvider {
   public void testNewPipelineCreatedIfAllPipelinesExcluded()
       throws IOException, TimeoutException {
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
+    for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
           1, repConfig, OWNER, new ExcludeList());
       allocatedContainers.add(container);
@@ -206,7 +240,7 @@ public class TestWritableECContainerProvider {
   public void testNewPipelineCreatedIfAllContainersExcluded()
       throws IOException, TimeoutException {
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
+    for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
           1, repConfig, OWNER, new ExcludeList());
       allocatedContainers.add(container);
@@ -234,8 +268,7 @@ public class TestWritableECContainerProvider {
         throw new IOException("Cannot create pipelines");
       }
     };
-    provider = new WritableECContainerProvider(
-        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+    provider = createSubject();
 
     try {
       provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -265,8 +298,7 @@ public class TestWritableECContainerProvider {
         return super.createPipeline(repConfig);
       }
     };
-    provider = new WritableECContainerProvider(
-        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+    provider = createSubject();
 
     try {
       provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -288,13 +320,8 @@ public class TestWritableECContainerProvider {
   @Test
   public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
       throws IOException, TimeoutException {
-    Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
-      ContainerInfo container =
-          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
-      assertFalse(allocatedContainers.contains(container));
-      allocatedContainers.add(container);
-    }
+    Set<ContainerInfo> allocatedContainers =
+        assertDistinctContainers(providerConf.getMinimumPipelines());
     // Update all the containers to make them nearly full, but with enough 
space
     // for an EC block to be striped across them.
     for (ContainerInfo c : allocatedContainers) {
@@ -336,16 +363,11 @@ public class TestWritableECContainerProvider {
         throw new PipelineNotFoundException("Simulated exception");
       }
     };
-    provider = new WritableECContainerProvider(
-        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+    provider = createSubject();
+
+    Set<ContainerInfo> allocatedContainers =
+        assertDistinctContainers(providerConf.getMinimumPipelines());
 
-    Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
-      ContainerInfo container =
-          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
-      assertFalse(allocatedContainers.contains(container));
-      allocatedContainers.add(container);
-    }
     // Now attempt to get a container - any attempt to use an existing with
     // throw PNF and then we must allocate a new one
     ContainerInfo newContainer =
@@ -357,19 +379,14 @@ public class TestWritableECContainerProvider {
   @Test
   public void testContainerNotFoundWhenAttemptingToUseExisting()
       throws IOException, TimeoutException {
-    Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
-      ContainerInfo container =
-          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
-      assertFalse(allocatedContainers.contains(container));
-      allocatedContainers.add(container);
-    }
+    Set<ContainerInfo> allocatedContainers =
+        assertDistinctContainers(providerConf.getMinimumPipelines());
 
     // Ensure ContainerManager always throws when a container is requested so
     // existing pipelines cannot be used
     Mockito.doAnswer(call -> {
       throw new ContainerNotFoundException();
-    }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
+    }).when(containerManager).getContainer(Mockito.any(ContainerID.class));
 
     ContainerInfo newContainer =
         provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -390,7 +407,7 @@ public class TestWritableECContainerProvider {
     // When tha happens, CM will change the container state to CLOSING and
     // remove it from the container list in pipeline Manager.
     Set<ContainerInfo> allocatedContainers = new HashSet<>();
-    for (int i = 0; i < minPipelines; i++) {
+    for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
       ContainerInfo container = provider.getContainer(
           1, repConfig, OWNER, new ExcludeList());
       assertFalse(allocatedContainers.contains(container));
@@ -412,8 +429,7 @@ public class TestWritableECContainerProvider {
   public void testExcludedNodesPassedToCreatePipelineIfProvided()
       throws IOException, TimeoutException {
     PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
-    provider = new WritableECContainerProvider(
-        conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
+    provider = createSubject(pipelineManagerSpy);
     ExcludeList excludeList = new ExcludeList();
 
     // EmptyList should be passed if there are no nodes excluded.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3267b7a2a5..21defdb08b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -520,6 +520,11 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
     return 0;
   }
 
+  @Override
+  public int totalHealthyVolumeCount() {
+    return 0;
+  }
+
   @Override
   public int pipelineLimit(DatanodeDetails dn) {
     return 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to