HDDS-802. Container State Manager should get open pipelines for allocating container. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9317a61f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9317a61f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9317a61f Branch: refs/heads/HDFS-13891 Commit: 9317a61f3cdc5ca91c6934eec9898cee3d65441a Parents: c80f753 Author: Yiqun Lin <yq...@apache.org> Authored: Thu Nov 8 23:41:43 2018 +0800 Committer: Yiqun Lin <yq...@apache.org> Committed: Thu Nov 8 23:41:43 2018 +0800 ---------------------------------------------------------------------- .../scm/container/ContainerStateManager.java | 4 +- .../hdds/scm/pipeline/PipelineManager.java | 3 + .../hdds/scm/pipeline/PipelineStateManager.java | 5 ++ .../hdds/scm/pipeline/PipelineStateMap.java | 22 +++++++ .../hdds/scm/pipeline/SCMPipelineManager.java | 11 ++++ .../scm/pipeline/TestPipelineStateManager.java | 61 ++++++++++++++++++-- 6 files changed, 100 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 87505c3..74c8dcb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -248,8 +248,8 @@ public class ContainerStateManager { try { pipeline = pipelineManager.createPipeline(type, replicationFactor); } catch (IOException e) { - final List<Pipeline> pipelines = - pipelineManager.getPipelines(type, replicationFactor); + final List<Pipeline> pipelines = pipelineManager + .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN); if (pipelines.isEmpty()) { throw new IOException("Could not allocate container"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 04ec535..cce09f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -46,6 +46,9 @@ public interface PipelineManager extends Closeable { List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor); + List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor, Pipeline.PipelineState state); + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 67f74d3..9f95378 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -64,6 +64,11 @@ class PipelineStateManager { return pipelineStateMap.getPipelines(type, factor); } + List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state) { + return pipelineStateMap.getPipelines(type, factor, state); + } + List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) { return pipelineStateMap.getPipelines(type, states); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 7b69491..85790b2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -166,6 +166,28 @@ class PipelineStateMap { } /** + * Get list of pipeline corresponding to specified replication type, + * replication factor and pipeline state. + * + * @param type - ReplicationType + * @param state - Required PipelineState + * @return List of pipelines with specified replication type, + * replication factor and pipeline state + */ + List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + Preconditions.checkNotNull(factor, "Replication factor cannot be null"); + Preconditions.checkNotNull(state, "Pipeline state cannot be null"); + + return pipelineMap.values().stream().filter( + pipeline -> pipeline.getType() == type + && pipeline.getPipelineState() == state + && pipeline.getFactor() == factor) + .collect(Collectors.toList()); + } + + /** * Get set of containerIDs corresponding to a pipeline. * * @param pipelineID - PipelineID http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 1c21748..5e8d0dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -166,6 +166,17 @@ public class SCMPipelineManager implements PipelineManager { } @Override + public List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor, Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, factor, state); + } finally { + lock.readLock().unlock(); + } + } + + @Override public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { lock.writeLock().lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index fd6f76b..0f5692e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -157,8 +157,8 @@ public class TestPipelineStateManager { stateManager.getPipelines(type, factor); Assert.assertEquals(15, pipelines1.size()); pipelines1.stream().forEach(p -> { - Assert.assertEquals(p.getType(), type); - Assert.assertEquals(p.getFactor(), factor); + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(factor, p.getFactor()); }); } } @@ -203,8 +203,8 @@ public class TestPipelineStateManager { .getPipelines(type, Pipeline.PipelineState.OPEN); Assert.assertEquals(5, pipelines1.size()); pipelines1.forEach(p -> { - Assert.assertEquals(p.getType(), type); - Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN); + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(Pipeline.PipelineState.OPEN, p.getPipelineState()); }); pipelines1 = stateManager @@ -220,6 +220,59 @@ public class TestPipelineStateManager { } @Test + public void testGetPipelinesByTypeFactorAndState() throws IOException { + Set<Pipeline> pipelines = new HashSet<>(); + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + for (int i = 0; i < 5; i++) { + // 5 pipelines in allocated state for each type and factor + Pipeline pipeline = + createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + pipelines.add(pipeline); + + // 5 pipelines in open state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getId()); + pipelines.add(pipeline); + + // 5 pipelines in closed state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.finalizePipeline(pipeline.getId()); + pipelines.add(pipeline); + } + } + } + + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { + // verify pipelines received + List<Pipeline> pipelines1 = + stateManager.getPipelines(type, factor, state); + Assert.assertEquals(5, pipelines1.size()); + pipelines1.forEach(p -> { + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(factor, p.getFactor()); + Assert.assertEquals(state, p.getPipelineState()); + }); + } + } + } + + //clean up + for (Pipeline pipeline : pipelines) { + removePipeline(pipeline); + } + } + + @Test public void testAddAndGetContainer() throws IOException { long containerID = 0; Pipeline pipeline = createDummyPipeline(1); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org