This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 3a94742 HDDS-5375. EC: Extend PipelineManager.createPipeline API to
support excluded nodes (#2378)
3a94742 is described below
commit 3a94742765e4693d09500a3efc24226a103dabf7
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Fri Jul 9 11:05:03 2021 +0100
HDDS-5375. EC: Extend PipelineManager.createPipeline API to support
excluded nodes (#2378)
---
.../hdds/scm/pipeline/ECPipelineProvider.java | 15 +++++--
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 6 +--
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 +++
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 12 +++++-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 4 ++
.../hdds/scm/pipeline/RatisPipelineProvider.java | 13 +++++-
.../hdds/scm/pipeline/SCMPipelineManager.java | 12 +++++-
.../hdds/scm/pipeline/SimplePipelineProvider.java | 8 ++++
.../scm/pipeline/WritableECContainerProvider.java | 24 ++++++-----
.../hdds/scm/pipeline/MockPipelineManager.java | 9 +++++
.../hdds/scm/pipeline/TestECPipelineProvider.java | 21 ++++++++++
.../scm/pipeline/TestRatisPipelineProvider.java | 27 +++++++++++++
.../pipeline/TestWritableECContainerProvider.java | 47 +++++++++++++++++++---
.../ozone/recon/scm/ReconPipelineFactory.java | 10 +++++
14 files changed, 189 insertions(+), 24 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index 5ce442f..d8fd343 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,10 +51,18 @@ public class ECPipelineProvider extends
PipelineProvider<ECReplicationConfig> {
}
@Override
- protected Pipeline create(ECReplicationConfig replicationConfig)
+ public synchronized Pipeline create(ECReplicationConfig replicationConfig)
throws IOException {
- List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
- null, replicationConfig.getRequiredNodes(), 0);
+ return create(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ protected Pipeline create(ECReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
+ List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(excludedNodes,
+ favoredNodes, replicationConfig.getRequiredNodes(), 0);
return create(replicationConfig, dns);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index b70b1a4..aec48cd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -80,12 +80,12 @@ public class PipelineFactory {
}
public Pipeline create(
- ReplicationConfig replicationConfig
- )
+ ReplicationConfig replicationConfig, List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes)
throws IOException {
return providers
.get(replicationConfig.getReplicationType())
- .create(replicationConfig);
+ .create(replicationConfig, excludedNodes, favoredNodes);
}
public Pipeline create(ReplicationConfig replicationConfig,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a35249..fed7f73 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -40,6 +40,11 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
throws IOException;
Pipeline createPipeline(
+ ReplicationConfig replicationConfig, List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) throws IOException;
+
+
+ Pipeline createPipeline(
ReplicationConfig replicationConfig,
List<DatanodeDetails> nodes
);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index c383093..adc75f4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -145,6 +146,14 @@ public class PipelineManagerV2Impl implements
PipelineManager {
public Pipeline createPipeline(
ReplicationConfig replicationConfig
) throws IOException {
+ return createPipeline(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ public Pipeline createPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
if (!isPipelineCreationAllowed() && !factorOne(replicationConfig)) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
@@ -153,7 +162,8 @@ public class PipelineManagerV2Impl implements
PipelineManager {
}
lock.lock();
try {
- Pipeline pipeline = pipelineFactory.create(replicationConfig);
+ Pipeline pipeline = pipelineFactory.create(replicationConfig,
+ excludedNodes, favoredNodes);
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersions.CURRENT_VERSION));
recordMetricsForPipeline(pipeline);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 50bc0e1..499285c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -61,6 +61,10 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
protected abstract Pipeline create(REPLICATION_CONFIG replicationConfig)
throws IOException;
+ protected abstract Pipeline create(REPLICATION_CONFIG replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException;
+
protected abstract Pipeline create(
REPLICATION_CONFIG replicationConfig,
List<DatanodeDetails> nodes
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 485ea83..8f4ed85 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -122,6 +123,14 @@ public class RatisPipelineProvider
@Override
public synchronized Pipeline create(RatisReplicationConfig replicationConfig)
throws IOException {
+ return create(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
if (exceedPipelineNumberLimit(replicationConfig)) {
throw new SCMException("Ratis pipeline number meets the limit: " +
pipelineNumberLimit + " replicationConfig : " +
@@ -138,8 +147,8 @@ public class RatisPipelineProvider
dns = pickNodesNeverUsed(replicationConfig);
break;
case THREE:
- dns = placementPolicy.chooseDatanodes(null,
- null, factor.getNumber(), 0);
+ dns = placementPolicy.chooseDatanodes(excludedNodes,
+ favoredNodes, factor.getNumber(), 0);
break;
default:
throw new IllegalStateException("Unknown factor: " + factor.name());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index cb2abb8..3ecd965 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -263,6 +264,14 @@ public class SCMPipelineManager implements
@Override
public Pipeline createPipeline(ReplicationConfig replicationConfig)
throws IOException {
+ return createPipeline(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ public Pipeline createPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
if (!isPipelineCreationAllowed()
&& replicationConfig.getRequiredNodes() != 1) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
@@ -272,7 +281,8 @@ public class SCMPipelineManager implements
}
lock.writeLock().lock();
try {
- Pipeline pipeline = pipelineFactory.create(replicationConfig);
+ Pipeline pipeline = pipelineFactory.create(replicationConfig,
+ excludedNodes, favoredNodes);
if (pipelineStore != null) {
pipelineStore.put(pipeline.getId(), pipeline);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 98b700e..8a22947 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -41,6 +41,14 @@ public class SimplePipelineProvider
@Override
public Pipeline create(StandaloneReplicationConfig replicationConfig)
throws IOException {
+ return create(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ public Pipeline create(StandaloneReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
List<DatanodeDetails> dns = pickNodesNeverUsed(replicationConfig);
if (dns.size() < replicationConfig.getRequiredNodes()) {
String e = String
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index 2f3cf92..6947e79 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -37,6 +38,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
@@ -94,9 +97,7 @@ public class WritableECContainerProvider
Pipeline.PipelineState.OPEN);
if (openPipelineCount < providerConfig.getMinimumPipelines()) {
try {
- // TODO - PipelineManager should allow for creating a pipeline with
- // excluded nodes. HDDS-5375.
- return allocateContainer(repConfig, size, owner);
+ return allocateContainer(repConfig, size, owner, excludeList);
} catch (IOException e) {
LOG.warn("Unable to allocate a container for {} with {} existing "
+ "containers", repConfig, openPipelineCount, e);
@@ -144,12 +145,10 @@ public class WritableECContainerProvider
}
}
// If we get here, all the pipelines we tried were no good. So try to
- // allocate a new one and use it.
+ // allocate a new one and usePipelineManagerV2Impl.java it.
try {
synchronized(this) {
- // TODO - PipelineManager should allow for creating a pipeline with
- // excluded nodes. HDDS-5375.
- return allocateContainer(repConfig, size, owner);
+ return allocateContainer(repConfig, size, owner, excludeList);
}
} catch (IOException e) {
LOG.error("Unable to allocate a container for {} after trying all "
@@ -159,8 +158,15 @@ public class WritableECContainerProvider
}
private ContainerInfo allocateContainer(ReplicationConfig repConfig,
- long size, String owner) throws IOException {
- Pipeline newPipeline = pipelineManager.createPipeline(repConfig);
+ long size, String owner, ExcludeList excludeList) throws IOException {
+
+ List<DatanodeDetails> excludedNodes = Collections.emptyList();
+ if (excludeList.getDatanodes().size() > 0) {
+ excludedNodes = new ArrayList<>(excludeList.getDatanodes());
+ }
+
+ Pipeline newPipeline = pipelineManager.createPipeline(repConfig,
+ excludedNodes, Collections.emptyList());
ContainerInfo container =
containerManager.getMatchingContainer(size, owner, newPipeline);
pipelineManager.openPipeline(newPipeline.getId());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 9838e8b..3bfa8f9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@@ -49,6 +50,14 @@ public class MockPipelineManager implements PipelineManager {
@Override
public Pipeline createPipeline(ReplicationConfig replicationConfig)
throws IOException {
+ return createPipeline(replicationConfig, Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Override
+ public Pipeline createPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
final List<DatanodeDetails> nodes = Stream.generate(
MockDatanodeDetails::randomDatanodeDetails)
.limit(replicationConfig.getRequiredNodes())
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
index ff20f57..326bc1c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -35,6 +35,7 @@ import java.util.List;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+import static org.mockito.Mockito.verify;
/**
* Test for the ECPipelineProvider.
@@ -81,4 +82,24 @@ public class TestECPipelineProvider {
}
}
+ @Test
+ public void testExcludedAndFavoredNodesPassedToPlacementPolicy()
+ throws IOException {
+ ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+ excludedNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ List<DatanodeDetails> favoredNodes = new ArrayList<>();
+ favoredNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ Pipeline pipeline = provider.create(ecConf, excludedNodes, favoredNodes);
+ Assert.assertEquals(EC, pipeline.getType());
+ Assert.assertEquals(ecConf.getData() + ecConf.getParity(),
+ pipeline.getNodes().size());
+
+ verify(placementPolicy).chooseDatanodes(excludedNodes, favoredNodes,
+ ecConf.getRequiredNodes(), 0);
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 8554c1f..ef86c2f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -41,6 +42,7 @@ import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.intersection;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -219,6 +221,31 @@ public class TestRatisPipelineProvider {
nodes.stream().anyMatch(membersOfClosedPipelines::contains));
}
+ @Test
+ // Test excluded nodes work correctly. Note that for Ratis, the
+ // PipelinePlacementPolicy, which Ratis is hardcoded to use, does not
consider
+ // favored nodes.
+ public void testCreateFactorTHREEPipelineWithExcludedDatanodes()
+ throws Exception {
+ init(1);
+ int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy())
+ .size();
+ // Add all but 3 nodes to the exclude list and ensure that the 3 picked
+ // nodes are not in the excluded list.
+ List<DatanodeDetails> excludedNodes = nodeManager
+ .getNodes(NodeStatus.inServiceHealthy()).stream()
+ .limit(healthyCount - 3).collect(Collectors.toList());
+
+ Pipeline pipeline1 = provider.create(
+ new RatisReplicationConfig(ReplicationFactor.THREE), excludedNodes,
+ Collections.EMPTY_LIST);
+
+ for (DatanodeDetails dn : pipeline1.getNodes()) {
+ assertFalse(excludedNodes.contains(dn));
+ }
+ }
+
+
private void addPipeline(
List<DatanodeDetails> dns,
Pipeline.PipelineState open, ReplicationConfig replicationConfig)
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index ceaec5f..ce68286 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -20,6 +20,8 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -37,8 +39,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
@@ -52,6 +57,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
/**
* Tests to validate the WritableECContainerProvider works correctly.
@@ -162,7 +168,7 @@ public class TestWritableECContainerProvider {
allocatedContainers.add(container);
}
// We have the min limit of pipelines, but then exclude one. It should use
- // one of the existing rather than createing a new one, as the limit is
+ // one of the existing rather than creating a new one, as the limit is
// checked against all pipelines, not just the filtered list
ExcludeList exclude = new ExcludeList();
for (ContainerInfo c : allocatedContainers) {
@@ -196,8 +202,9 @@ public class TestWritableECContainerProvider {
public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
pipelineManager = new MockPipelineManager() {
@Override
- public Pipeline createPipeline(ReplicationConfig repConf)
- throws IOException {
+ public Pipeline createPipeline(ReplicationConfig repConf,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) throws IOException {
throw new IOException("Cannot create pipelines");
}
};
@@ -217,8 +224,9 @@ public class TestWritableECContainerProvider {
private boolean throwError = false;
@Override
- public Pipeline createPipeline(ReplicationConfig repConf)
- throws IOException {
+ public Pipeline createPipeline(ReplicationConfig repConf,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) throws IOException {
if (throwError) {
throw new IOException("Cannot create pipelines");
}
@@ -348,6 +356,35 @@ public class TestWritableECContainerProvider {
}
}
+ @Test
+ public void testExcludedNodesPassedToCreatePipelineIfProvided()
+ throws IOException {
+ PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
+ provider = new WritableECContainerProvider(
+ conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
+ ExcludeList excludeList = new ExcludeList();
+
+ // EmptyList should be passed if there are no nodes excluded.
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, excludeList);
+ assertNotNull(container);
+
+ verify(pipelineManagerSpy).createPipeline(repConfig,
+ Collections.emptyList(), Collections.emptyList());
+
+ // If nodes are excluded then the excluded nodes should be passed through
to
+ // the create pipeline call.
+ excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails());
+ List<DatanodeDetails> excludedNodes =
+ new ArrayList<>(excludeList.getDatanodes());
+
+ container = provider.getContainer(
+ 1, repConfig, OWNER, excludeList);
+ assertNotNull(container);
+ verify(pipelineManagerSpy).createPipeline(repConfig, excludedNodes,
+ Collections.emptyList());
+ }
+
private ContainerInfo createContainer(Pipeline pipeline,
ReplicationConfig repConf, long containerID) {
return new ContainerInfo.Builder()
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index f390ed7..885f3f3 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -52,6 +52,16 @@ public class ReconPipelineFactory extends PipelineFactory {
@Override
public Pipeline create(ReplicationConfig config,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) {
+ // We don't expect this to be called at all. But adding this as a red
+ // flag for troubleshooting.
+ throw new UnsupportedOperationException(
+ "Trying to create pipeline in Recon, which is prohibited!");
+ }
+
+ @Override
+ public Pipeline create(ReplicationConfig config,
List<DatanodeDetails> nodes) {
throw new UnsupportedOperationException(
"Trying to create pipeline in Recon, which is prohibited!");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]