This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 54a393a4d4 HDDS-8674. Allow more EC pipelines based on number of
volumes (#4758)
54a393a4d4 is described below
commit 54a393a4d4718f2ffd7972644641a972342e66d7
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed May 24 11:32:39 2023 +0200
HDDS-8674. Allow more EC pipelines based on number of volumes (#4758)
---
.../apache/hadoop/hdds/scm/node/NodeManager.java | 2 +
.../hadoop/hdds/scm/node/SCMNodeManager.java | 9 ++
.../scm/pipeline/WritableContainerFactory.java | 25 +++-
.../scm/pipeline/WritableECContainerProvider.java | 81 ++++++++---
.../hadoop/hdds/scm/container/MockNodeManager.java | 5 +
.../hdds/scm/container/SimpleMockNodeManager.java | 5 +
.../pipeline/TestWritableECContainerProvider.java | 156 ++++++++++++---------
.../testutils/ReplicationNodeManagerMock.java | 5 +
8 files changed, 194 insertions(+), 94 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 3601c3230d..ac38166fc6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -383,6 +383,8 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
int minHealthyVolumeNum(List <DatanodeDetails> dnList);
+ int totalHealthyVolumeCount();
+
int pipelineLimit(DatanodeDetails dn);
int minPipelineLimit(List<DatanodeDetails> dn);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 9141a9b442..edf226f437 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -1132,6 +1132,15 @@ public class SCMNodeManager implements NodeManager {
return Collections.min(volumeCountList);
}
+ @Override
+ public int totalHealthyVolumeCount() {
+ int sum = 0;
+ for (DatanodeInfo dn : nodeStateManager.getNodes(IN_SERVICE, HEALTHY)) {
+ sum += dn.getHealthyVolumeCount();
+ }
+ return sum;
+ }
+
/**
* Returns the pipeline limit for the datanode.
* if the datanode pipeline limit is set, consider that as the max
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
index 4c2fe4d8c5..b8eae8b6d1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -20,13 +20,19 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+
/**
* Factory class to obtain a container to which a block can be allocated for
* write.
@@ -38,12 +44,19 @@ public class WritableContainerFactory {
private final WritableContainerProvider<ECReplicationConfig> ecProvider;
public WritableContainerFactory(StorageContainerManager scm) {
+ ConfigurationSource conf = scm.getConfiguration();
+
this.ratisProvider = new WritableRatisContainerProvider(
- scm.getConfiguration(), scm.getPipelineManager(),
+ conf, scm.getPipelineManager(),
scm.getContainerManager(), scm.getPipelineChoosePolicy());
this.standaloneProvider = ratisProvider;
- this.ecProvider = new WritableECContainerProvider(scm.getConfiguration(),
- scm.getPipelineManager(), scm.getContainerManager(),
+
+ this.ecProvider = new WritableECContainerProvider(
+ conf.getObject(WritableECContainerProviderConfig.class),
+ getConfiguredContainerSize(conf),
+ scm.getScmNodeManager(),
+ scm.getPipelineManager(),
+ scm.getContainerManager(),
scm.getPipelineChoosePolicy());
}
@@ -64,4 +77,10 @@ public class WritableContainerFactory {
+ " is an invalid replication type");
}
}
+
+ private long getConfiguredContainerSize(ConfigurationSource conf) {
+ return (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
+ OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 7833f150f4..48173d1f63 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -24,16 +24,16 @@ import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +44,7 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.TimeoutException;
-import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
/**
* Writable Container provider to obtain a writable container for EC pipelines.
@@ -55,23 +55,25 @@ public class WritableECContainerProvider
private static final Logger LOG = LoggerFactory
.getLogger(WritableECContainerProvider.class);
- private final ConfigurationSource conf;
+ private final NodeManager nodeManager;
private final PipelineManager pipelineManager;
private final PipelineChoosePolicy pipelineChoosePolicy;
private final ContainerManager containerManager;
private final long containerSize;
private final WritableECContainerProviderConfig providerConfig;
- public WritableECContainerProvider(ConfigurationSource conf,
- PipelineManager pipelineManager, ContainerManager containerManager,
+ public WritableECContainerProvider(WritableECContainerProviderConfig config,
+ long containerSize,
+ NodeManager nodeManager,
+ PipelineManager pipelineManager,
+ ContainerManager containerManager,
PipelineChoosePolicy pipelineChoosePolicy) {
- this.conf = conf;
- this.providerConfig =
- conf.getObject(WritableECContainerProviderConfig.class);
+ this.providerConfig = config;
+ this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.pipelineChoosePolicy = pipelineChoosePolicy;
- this.containerSize = getConfiguredContainerSize();
+ this.containerSize = containerSize;
}
/**
@@ -87,19 +89,18 @@ public class WritableECContainerProvider
* not be considered.
* @return A containerInfo representing a block group with with space for the
* write, or null if no container can be allocated.
- * @throws IOException
*/
@Override
public ContainerInfo getContainer(final long size,
ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
throws IOException, TimeoutException {
+ int minimumPipelines = getMinimumPipelines(repConfig);
synchronized (this) {
int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
Pipeline.PipelineState.OPEN);
- if (openPipelineCount < providerConfig.getMinimumPipelines()) {
+ if (openPipelineCount < minimumPipelines) {
try {
- return allocateContainer(
- repConfig, size, owner, excludeList);
+ return allocateContainer(repConfig, size, owner, excludeList);
} catch (IOException e) {
LOG.warn("Unable to allocate a container for {} with {} existing "
+ "containers", repConfig, openPipelineCount, e);
@@ -159,6 +160,16 @@ public class WritableECContainerProvider
}
}
+ private int getMinimumPipelines(ECReplicationConfig repConfig) {
+ final double factor = providerConfig.getPipelinePerVolumeFactor();
+ int volumeBasedCount = 0;
+ if (factor > 0) {
+ int volumes = nodeManager.totalHealthyVolumeCount();
+ volumeBasedCount = (int) factor * volumes / repConfig.getRequiredNodes();
+ }
+ return Math.max(volumeBasedCount, providerConfig.getMinimumPipelines());
+ }
+
private ContainerInfo allocateContainer(ReplicationConfig repConfig,
long size, String owner, ExcludeList excludeList)
throws IOException, TimeoutException {
@@ -205,18 +216,14 @@ public class WritableECContainerProvider
return container.getUsedBytes() + size <= containerSize;
}
- private long getConfiguredContainerSize() {
- return (long) conf.getStorageSize(
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
- }
-
/**
* Class to hold configuration for WriteableECContainerProvider.
*/
- @ConfigGroup(prefix = "ozone.scm.ec")
+ @ConfigGroup(prefix = WritableECContainerProviderConfig.PREFIX)
public static class WritableECContainerProviderConfig {
+ private static final String PREFIX = "ozone.scm.ec";
+
@Config(key = "pipeline.minimum",
defaultValue = "5",
type = ConfigType.INT,
@@ -233,6 +240,38 @@ public class WritableECContainerProvider
this.minimumPipelines = minPipelines;
}
+ private static final String PIPELINE_PER_VOLUME_FACTOR_KEY =
+ "pipeline.per.volume.factor";
+ private static final double PIPELINE_PER_VOLUME_FACTOR_DEFAULT = 1;
+ private static final String PIPELINE_PER_VOLUME_FACTOR_DEFAULT_VALUE = "1";
+ private static final String EC_PIPELINE_PER_VOLUME_FACTOR_KEY =
+ PREFIX + "." + PIPELINE_PER_VOLUME_FACTOR_KEY;
+
+ @Config(key = PIPELINE_PER_VOLUME_FACTOR_KEY,
+ type = ConfigType.DOUBLE,
+ defaultValue = PIPELINE_PER_VOLUME_FACTOR_DEFAULT_VALUE,
+ tags = {SCM},
+ description = "TODO"
+ )
+ private double pipelinePerVolumeFactor =
PIPELINE_PER_VOLUME_FACTOR_DEFAULT;
+
+ public double getPipelinePerVolumeFactor() {
+ return pipelinePerVolumeFactor;
+ }
+
+ @PostConstruct
+ public void validate() {
+ if (pipelinePerVolumeFactor < 0) {
+ LOG.warn("{} must be non-negative, but was {}. Defaulting to {}",
+ EC_PIPELINE_PER_VOLUME_FACTOR_KEY, pipelinePerVolumeFactor,
+ PIPELINE_PER_VOLUME_FACTOR_DEFAULT);
+ pipelinePerVolumeFactor = PIPELINE_PER_VOLUME_FACTOR_DEFAULT;
+ }
+ }
+
+ public void setPipelinePerVolumeFactor(double v) {
+ pipelinePerVolumeFactor = v;
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index a2d4f09583..ae37161f36 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -901,6 +901,11 @@ public class MockNodeManager implements NodeManager {
return numHealthyDisksPerDatanode;
}
+ @Override
+ public int totalHealthyVolumeCount() {
+ return healthyNodes.size() * numHealthyDisksPerDatanode;
+ }
+
@Override
public int pipelineLimit(DatanodeDetails dn) {
// by default 1 single node pipeline and 1 three node pipeline
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 76f5f56a68..8c3d027d1d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -366,6 +366,11 @@ public class SimpleMockNodeManager implements NodeManager {
return 0;
}
+ @Override
+ public int totalHealthyVolumeCount() {
+ return 0;
+ }
+
@Override
public int pipelineLimit(DatanodeDetails dn) {
return 1;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index 3296188707..37a91443fc 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -35,7 +35,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
+import
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
@@ -43,10 +43,7 @@ import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.Matchers;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -63,7 +60,6 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -76,33 +72,30 @@ import static org.mockito.Mockito.verify;
*/
public class TestWritableECContainerProvider {
- private static final Logger LOG = LoggerFactory
- .getLogger(TestWritableECContainerProvider.class);
private static final String OWNER = "SCM";
private PipelineManager pipelineManager;
- private ContainerManager containerManager
+ private final ContainerManager containerManager
= Mockito.mock(ContainerManager.class);
- private PipelineChoosePolicy pipelineChoosingPolicy
+ private final PipelineChoosePolicy pipelineChoosingPolicy
= new HealthyPipelineChoosePolicy();
private OzoneConfiguration conf;
private DBStore dbStore;
private SCMHAManager scmhaManager;
- private NodeManager nodeManager;
- private WritableContainerProvider provider;
- private ReplicationConfig repConfig;
- private int minPipelines;
+ private MockNodeManager nodeManager;
+ private WritableContainerProvider<ECReplicationConfig> provider;
+ private ECReplicationConfig repConfig;
private Map<ContainerID, ContainerInfo> containers;
+ private WritableECContainerProviderConfig providerConf;
@BeforeEach
public void setup() throws IOException {
repConfig = new ECReplicationConfig(3, 2);
conf = new OzoneConfiguration();
- WritableECContainerProvider.WritableECContainerProviderConfig providerConf
=
- conf.getObject(WritableECContainerProvider
- .WritableECContainerProviderConfig.class);
- minPipelines = providerConf.getMinimumPipelines();
+
+ providerConf = conf.getObject(WritableECContainerProviderConfig.class);
+
containers = new HashMap<>();
File testDir = GenericTestUtils.getTestDir(
TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
@@ -113,8 +106,8 @@ public class TestWritableECContainerProvider {
nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
- provider = new WritableECContainerProvider(
- conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ provider = createSubject();
Mockito.doAnswer(call -> {
Pipeline pipeline = (Pipeline)call.getArguments()[2];
@@ -124,47 +117,88 @@ public class TestWritableECContainerProvider {
pipeline.getId(), container.containerID());
containers.put(container.containerID(), container);
return container;
- }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
- Matchers.anyString(), Matchers.any(Pipeline.class));
+ }).when(containerManager).getMatchingContainer(Mockito.anyLong(),
+ Mockito.anyString(), Mockito.any(Pipeline.class));
Mockito.doAnswer(call ->
containers.get((ContainerID)call.getArguments()[0]))
- .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+ .when(containerManager).getContainer(Mockito.any(ContainerID.class));
}
+ private WritableContainerProvider<ECReplicationConfig> createSubject() {
+ return createSubject(pipelineManager);
+ }
+
+ private WritableContainerProvider<ECReplicationConfig> createSubject(
+ PipelineManager customPipelineManager) {
+ return new WritableECContainerProvider(providerConf, getMaxContainerSize(),
+ nodeManager, customPipelineManager, containerManager,
+ pipelineChoosingPolicy);
+ }
+
@Test
- public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+ void testPipelinesCreatedBasedOnTotalDiskCount()
+ throws IOException, TimeoutException {
+ providerConf.setMinimumPipelines(1);
+ nodeManager.setNumHealthyVolumes(20);
+
+ int volumeCount = nodeManager.totalHealthyVolumeCount();
+ int pipelineLimit = volumeCount / repConfig.getRequiredNodes();
+ Set<ContainerInfo> allocated = assertDistinctContainers(pipelineLimit);
+ assertReusesExisting(allocated, pipelineLimit);
+ }
+
+ @Test
+ void testPipelinesCreatedBasedOnTotalDiskCountWithFactor()
+ throws IOException, TimeoutException {
+ int factor = 10;
+ providerConf.setMinimumPipelines(1);
+ providerConf.setPipelinePerVolumeFactor(factor);
+ nodeManager.setNumHealthyVolumes(5);
+
+ int volumeCount = nodeManager.totalHealthyVolumeCount();
+ int pipelineLimit = factor * volumeCount / repConfig.getRequiredNodes();
+ Set<ContainerInfo> allocated = assertDistinctContainers(pipelineLimit);
+ assertReusesExisting(allocated, pipelineLimit);
+ }
+
+ @Test
+ void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+ throws IOException, TimeoutException {
+ int minimumPipelines = providerConf.getMinimumPipelines();
+ Set<ContainerInfo> allocated = assertDistinctContainers(minimumPipelines);
+ assertReusesExisting(allocated, minimumPipelines);
+ }
+
+ private Set<ContainerInfo> assertDistinctContainers(int n)
throws IOException, TimeoutException {
- // The first 5 calls should return a different container
Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
+ for (int i = 0; i < n; i++) {
ContainerInfo container =
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
- assertFalse(allocatedContainers.contains(container));
+ assertFalse(allocatedContainers.contains(container),
+ "Provided existing container for request " + i);
allocatedContainers.add(container);
}
+ return allocatedContainers;
+ }
- allocatedContainers.clear();
- for (int i = 0; i < 20; i++) {
+ private void assertReusesExisting(Set<ContainerInfo> existing, int n)
+ throws IOException, TimeoutException {
+ for (int i = 0; i < 3 * n; i++) {
ContainerInfo container =
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
- allocatedContainers.add(container);
+ assertTrue(existing.contains(container),
+ "Provided new container for request " + i);
}
- // Should have minPipelines containers created
- assertEquals(minPipelines,
- pipelineManager.getPipelines(repConfig, OPEN).size());
- // We should have more than 1 allocatedContainers in the set proving a
- // random container is selected each time. Do not check for 5 here as there
- // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
- assertTrue(allocatedContainers.size() > 2);
}
@Test
public void testPiplineLimitIgnoresExcludedPipelines()
throws IOException, TimeoutException {
Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
+ for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
1, repConfig, OWNER, new ExcludeList());
allocatedContainers.add(container);
@@ -186,7 +220,7 @@ public class TestWritableECContainerProvider {
public void testNewPipelineCreatedIfAllPipelinesExcluded()
throws IOException, TimeoutException {
Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
+ for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
1, repConfig, OWNER, new ExcludeList());
allocatedContainers.add(container);
@@ -206,7 +240,7 @@ public class TestWritableECContainerProvider {
public void testNewPipelineCreatedIfAllContainersExcluded()
throws IOException, TimeoutException {
Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
+ for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
1, repConfig, OWNER, new ExcludeList());
allocatedContainers.add(container);
@@ -234,8 +268,7 @@ public class TestWritableECContainerProvider {
throw new IOException("Cannot create pipelines");
}
};
- provider = new WritableECContainerProvider(
- conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+ provider = createSubject();
try {
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -265,8 +298,7 @@ public class TestWritableECContainerProvider {
return super.createPipeline(repConfig);
}
};
- provider = new WritableECContainerProvider(
- conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+ provider = createSubject();
try {
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -288,13 +320,8 @@ public class TestWritableECContainerProvider {
@Test
public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
throws IOException, TimeoutException {
- Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
- ContainerInfo container =
- provider.getContainer(1, repConfig, OWNER, new ExcludeList());
- assertFalse(allocatedContainers.contains(container));
- allocatedContainers.add(container);
- }
+ Set<ContainerInfo> allocatedContainers =
+ assertDistinctContainers(providerConf.getMinimumPipelines());
// Update all the containers to make them nearly full, but with enough
space
// for an EC block to be striped across them.
for (ContainerInfo c : allocatedContainers) {
@@ -336,16 +363,11 @@ public class TestWritableECContainerProvider {
throw new PipelineNotFoundException("Simulated exception");
}
};
- provider = new WritableECContainerProvider(
- conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+ provider = createSubject();
+
+ Set<ContainerInfo> allocatedContainers =
+ assertDistinctContainers(providerConf.getMinimumPipelines());
- Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
- ContainerInfo container =
- provider.getContainer(1, repConfig, OWNER, new ExcludeList());
- assertFalse(allocatedContainers.contains(container));
- allocatedContainers.add(container);
- }
// Now attempt to get a container - any attempt to use an existing with
// throw PNF and then we must allocate a new one
ContainerInfo newContainer =
@@ -357,19 +379,14 @@ public class TestWritableECContainerProvider {
@Test
public void testContainerNotFoundWhenAttemptingToUseExisting()
throws IOException, TimeoutException {
- Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
- ContainerInfo container =
- provider.getContainer(1, repConfig, OWNER, new ExcludeList());
- assertFalse(allocatedContainers.contains(container));
- allocatedContainers.add(container);
- }
+ Set<ContainerInfo> allocatedContainers =
+ assertDistinctContainers(providerConf.getMinimumPipelines());
// Ensure ContainerManager always throws when a container is requested so
// existing pipelines cannot be used
Mockito.doAnswer(call -> {
throw new ContainerNotFoundException();
- }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
+ }).when(containerManager).getContainer(Mockito.any(ContainerID.class));
ContainerInfo newContainer =
provider.getContainer(1, repConfig, OWNER, new ExcludeList());
@@ -390,7 +407,7 @@ public class TestWritableECContainerProvider {
// When tha happens, CM will change the container state to CLOSING and
// remove it from the container list in pipeline Manager.
Set<ContainerInfo> allocatedContainers = new HashSet<>();
- for (int i = 0; i < minPipelines; i++) {
+ for (int i = 0; i < providerConf.getMinimumPipelines(); i++) {
ContainerInfo container = provider.getContainer(
1, repConfig, OWNER, new ExcludeList());
assertFalse(allocatedContainers.contains(container));
@@ -412,8 +429,7 @@ public class TestWritableECContainerProvider {
public void testExcludedNodesPassedToCreatePipelineIfProvided()
throws IOException, TimeoutException {
PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
- provider = new WritableECContainerProvider(
- conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
+ provider = createSubject(pipelineManagerSpy);
ExcludeList excludeList = new ExcludeList();
// EmptyList should be passed if there are no nodes excluded.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3267b7a2a5..21defdb08b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -520,6 +520,11 @@ public class ReplicationNodeManagerMock implements
NodeManager {
return 0;
}
+ @Override
+ public int totalHealthyVolumeCount() {
+ return 0;
+ }
+
@Override
public int pipelineLimit(DatanodeDetails dn) {
return 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]