HDDS-802. Container State Manager should get open pipelines for allocating 
container. Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9317a61f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9317a61f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9317a61f

Branch: refs/heads/HDFS-13891
Commit: 9317a61f3cdc5ca91c6934eec9898cee3d65441a
Parents: c80f753
Author: Yiqun Lin <yq...@apache.org>
Authored: Thu Nov 8 23:41:43 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Thu Nov 8 23:41:43 2018 +0800

----------------------------------------------------------------------
 .../scm/container/ContainerStateManager.java    |  4 +-
 .../hdds/scm/pipeline/PipelineManager.java      |  3 +
 .../hdds/scm/pipeline/PipelineStateManager.java |  5 ++
 .../hdds/scm/pipeline/PipelineStateMap.java     | 22 +++++++
 .../hdds/scm/pipeline/SCMPipelineManager.java   | 11 ++++
 .../scm/pipeline/TestPipelineStateManager.java  | 61 ++++++++++++++++++--
 6 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 87505c3..74c8dcb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -248,8 +248,8 @@ public class ContainerStateManager {
     try {
       pipeline = pipelineManager.createPipeline(type, replicationFactor);
     } catch (IOException e) {
-      final List<Pipeline> pipelines =
-          pipelineManager.getPipelines(type, replicationFactor);
+      final List<Pipeline> pipelines = pipelineManager
+          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
       if (pipelines.isEmpty()) {
         throw new IOException("Could not allocate container");
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 04ec535..cce09f3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -46,6 +46,9 @@ public interface PipelineManager extends Closeable {
   List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor);
 
+  List<Pipeline> getPipelines(ReplicationType type,
+      ReplicationFactor factor, Pipeline.PipelineState state);
+
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
       throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 67f74d3..9f95378 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -64,6 +64,11 @@ class PipelineStateManager {
     return pipelineStateMap.getPipelines(type, factor);
   }
 
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state) {
+    return pipelineStateMap.getPipelines(type, factor, state);
+  }
+
   List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
     return pipelineStateMap.getPipelines(type, states);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 7b69491..85790b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -166,6 +166,28 @@ class PipelineStateMap {
   }
 
   /**
+   * Get list of pipeline corresponding to specified replication type,
+   * replication factor and pipeline state.
+   *
+   * @param type - ReplicationType
+   * @param state - Required PipelineState
+   * @return List of pipelines with specified replication type,
+   * replication factor and pipeline state
+   */
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+    Preconditions.checkNotNull(factor, "Replication factor cannot be null");
+    Preconditions.checkNotNull(state, "Pipeline state cannot be null");
+
+    return pipelineMap.values().stream().filter(
+        pipeline -> pipeline.getType() == type
+            && pipeline.getPipelineState() == state
+            && pipeline.getFactor() == factor)
+        .collect(Collectors.toList());
+  }
+
+  /**
    * Get set of containerIDs corresponding to a pipeline.
    *
    * @param pipelineID - PipelineID

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 1c21748..5e8d0dc 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -166,6 +166,17 @@ public class SCMPipelineManager implements PipelineManager 
{
   }
 
   @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+      ReplicationFactor factor, Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, factor, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public void addContainerToPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {
     lock.writeLock().lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9317a61f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index fd6f76b..0f5692e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -157,8 +157,8 @@ public class TestPipelineStateManager {
             stateManager.getPipelines(type, factor);
         Assert.assertEquals(15, pipelines1.size());
         pipelines1.stream().forEach(p -> {
-          Assert.assertEquals(p.getType(), type);
-          Assert.assertEquals(p.getFactor(), factor);
+          Assert.assertEquals(type, p.getType());
+          Assert.assertEquals(factor, p.getFactor());
         });
       }
     }
@@ -203,8 +203,8 @@ public class TestPipelineStateManager {
           .getPipelines(type, Pipeline.PipelineState.OPEN);
       Assert.assertEquals(5, pipelines1.size());
       pipelines1.forEach(p -> {
-        Assert.assertEquals(p.getType(), type);
-        Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN);
+        Assert.assertEquals(type, p.getType());
+        Assert.assertEquals(Pipeline.PipelineState.OPEN, p.getPipelineState());
       });
 
       pipelines1 = stateManager
@@ -220,6 +220,59 @@ public class TestPipelineStateManager {
   }
 
   @Test
+  public void testGetPipelinesByTypeFactorAndState() throws IOException {
+    Set<Pipeline> pipelines = new HashSet<>();
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        for (int i = 0; i < 5; i++) {
+          // 5 pipelines in allocated state for each type and factor
+          Pipeline pipeline =
+              createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          pipelines.add(pipeline);
+
+          // 5 pipelines in open state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.openPipeline(pipeline.getId());
+          pipelines.add(pipeline);
+
+          // 5 pipelines in closed state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.finalizePipeline(pipeline.getId());
+          pipelines.add(pipeline);
+        }
+      }
+    }
+
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+          // verify pipelines received
+          List<Pipeline> pipelines1 =
+              stateManager.getPipelines(type, factor, state);
+          Assert.assertEquals(5, pipelines1.size());
+          pipelines1.forEach(p -> {
+            Assert.assertEquals(type, p.getType());
+            Assert.assertEquals(factor, p.getFactor());
+            Assert.assertEquals(state, p.getPipelineState());
+          });
+        }
+      }
+    }
+
+    //clean up
+    for (Pipeline pipeline : pipelines) {
+      removePipeline(pipeline);
+    }
+  }
+
+  @Test
   public void testAddAndGetContainer() throws IOException {
     long containerID = 0;
     Pipeline pipeline = createDummyPipeline(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to