This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f15552d79 HDDS-8919. Allow EC pipelines to be created and then added 
to PipelineManager in two steps (#4968)
0f15552d79 is described below

commit 0f15552d79a58fa4f0b3d675d06cacbc65ecb59b
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jun 26 22:04:15 2023 +0100

    HDDS-8919. Allow EC pipelines to be created and then added to 
PipelineManager in two steps (#4968)
---
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  7 ++
 .../hdds/scm/pipeline/PipelineManagerImpl.java     | 80 +++++++++++++++++++---
 .../hdds/scm/pipeline/MockPipelineManager.java     | 20 +++++-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 15 +++-
 4 files changed, 109 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 60b385049a..4f224285e6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -45,6 +45,13 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
                           List<DatanodeDetails> favoredNodes)
       throws IOException, TimeoutException;
 
+  Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+                           List<DatanodeDetails> excludedNodes,
+                           List<DatanodeDetails> favoredNodes)
+      throws IOException, TimeoutException;
+
+  void addEcPipeline(Pipeline pipeline) throws IOException, TimeoutException;
+
 
   Pipeline createPipeline(
       ReplicationConfig replicationConfig,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index e7b6bf1f78..e234eb4115 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -195,6 +195,48 @@ public class PipelineManagerImpl implements 
PipelineManager {
     return pipelineManager;
   }
 
+  /**
+   * Build a new pipeline and return it, but do not add it to the pipeline
+   * manager. This new pipeline will be in ALLOCATED state, but also 
unavailable
+   * to clients in the system until it is added to the pipeline manager via the
+   * addPipeline method.
+   * @param replicationConfig
+   * @param excludedNodes
+   * @param favoredNodes
+   * @return The created pipeline.
+   * @throws IOException
+   */
+  @Override
+  public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+      throws IOException {
+    if (replicationConfig.getReplicationType() != ReplicationType.EC) {
+      throw new IllegalArgumentException("Replication type must be EC");
+    }
+    checkIfPipelineCreationIsAllowed(replicationConfig);
+    return pipelineFactory.create(replicationConfig, excludedNodes,
+        favoredNodes);
+  }
+
+  /**
+   * Add a previously built pipeline to the pipeline manager. This will allow
+   * the pipline to be used by clients in the system.
+   * @param pipeline
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  @Override
+  public void addEcPipeline(Pipeline pipeline)
+      throws IOException, TimeoutException {
+    if (pipeline.getReplicationConfig().getReplicationType()
+        != ReplicationType.EC) {
+      throw new IllegalArgumentException(
+          "Pipeline replication type must be EC");
+    }
+    checkIfPipelineCreationIsAllowed(pipeline.getReplicationConfig());
+    addPipelineToManager(pipeline);
+  }
+
   @Override
   public Pipeline createPipeline(ReplicationConfig replicationConfig)
       throws IOException, TimeoutException {
@@ -206,6 +248,27 @@ public class PipelineManagerImpl implements 
PipelineManager {
   public Pipeline createPipeline(ReplicationConfig replicationConfig,
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
       throws IOException, TimeoutException {
+    checkIfPipelineCreationIsAllowed(replicationConfig);
+
+    acquireWriteLock();
+    final Pipeline pipeline;
+    try {
+      try {
+        pipeline = pipelineFactory.create(replicationConfig,
+            excludedNodes, favoredNodes);
+      } catch (IOException e) {
+        metrics.incNumPipelineCreationFailed();
+        throw e;
+      }
+      addPipelineToManager(pipeline);
+      return pipeline;
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private void checkIfPipelineCreationIsAllowed(
+      ReplicationConfig replicationConfig) throws IOException {
     if (!isPipelineCreationAllowed() && !factorOne(replicationConfig)) {
       LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
           "complete");
@@ -219,25 +282,24 @@ public class PipelineManagerImpl implements 
PipelineManager {
       LOG.info(message);
       throw new IOException(message);
     }
+  }
 
+  private void addPipelineToManager(Pipeline pipeline)
+      throws IOException, TimeoutException {
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersion.CURRENT_VERSION);
     acquireWriteLock();
-    final Pipeline pipeline;
     try {
-      pipeline = pipelineFactory.create(replicationConfig,
-          excludedNodes, favoredNodes);
-      stateManager.addPipeline(pipeline.getProtobufMessage(
-          ClientVersion.CURRENT_VERSION));
+      stateManager.addPipeline(pipelineProto);
     } catch (IOException | TimeoutException ex) {
-      LOG.debug("Failed to create pipeline with replicationConfig {}.",
-          replicationConfig, ex);
+      LOG.debug("Failed to add pipeline {}.", pipeline, ex);
       metrics.incNumPipelineCreationFailed();
       throw ex;
     } finally {
       releaseWriteLock();
     }
-    LOG.info("Created pipeline {}.", pipeline);
+    LOG.info("Added pipeline {}.", pipeline);
     recordMetricsForPipeline(pipeline);
-    return pipeline;
   }
 
   private boolean factorOne(ReplicationConfig replicationConfig) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 5ca444785a..5303f44c26 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -71,8 +71,20 @@ public class MockPipelineManager implements PipelineManager {
   public Pipeline createPipeline(ReplicationConfig replicationConfig,
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
       throws IOException, TimeoutException {
+    Pipeline pipeline = buildECPipeline(replicationConfig, excludedNodes,
+        favoredNodes);
+
+    stateManager.addPipeline(pipeline.getProtobufMessage(
+        ClientVersion.CURRENT_VERSION));
+    return pipeline;
+  }
+
+  @Override
+  public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+      throws IOException, TimeoutException {
     final List<DatanodeDetails> nodes = Stream.generate(
-        MockDatanodeDetails::randomDatanodeDetails)
+            MockDatanodeDetails::randomDatanodeDetails)
         .limit(replicationConfig.getRequiredNodes())
         .collect(Collectors.toList());
     final Pipeline pipeline = Pipeline.newBuilder()
@@ -81,10 +93,14 @@ public class MockPipelineManager implements PipelineManager 
{
         .setNodes(nodes)
         .setState(Pipeline.PipelineState.OPEN)
         .build();
+    return pipeline;
+  }
 
+  @Override
+  public void addEcPipeline(Pipeline pipeline)
+      throws IOException, TimeoutException {
     stateManager.addPipeline(pipeline.getProtobufMessage(
         ClientVersion.CURRENT_VERSION));
-    return pipeline;
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 80d3515247..95ccf87f0b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -72,6 +72,7 @@ import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -194,6 +195,16 @@ public class TestPipelineManagerImpl {
         RatisReplicationConfig.getInstance(ReplicationFactor.ONE));
     Assertions.assertEquals(2, pipelineManager.getPipelines().size());
     Assertions.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+
+    Pipeline builtPipeline = pipelineManager.buildECPipeline(
+        new ECReplicationConfig(3, 2),
+        Collections.emptyList(), Collections.emptyList());
+    pipelineManager.addEcPipeline(builtPipeline);
+
+    Assertions.assertEquals(3, pipelineManager.getPipelines().size());
+    Assertions.assertTrue(pipelineManager.containsPipeline(
+        builtPipeline.getId()));
+
     buffer1.close();
     pipelineManager.close();
 
@@ -203,11 +214,11 @@ public class TestPipelineManagerImpl {
         createPipelineManager(true, buffer2);
     // Should be able to load previous pipelines.
     Assertions.assertFalse(pipelineManager2.getPipelines().isEmpty());
-    Assertions.assertEquals(2, pipelineManager.getPipelines().size());
+    Assertions.assertEquals(3, pipelineManager.getPipelines().size());
     Pipeline pipeline3 = pipelineManager2.createPipeline(
         RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
     buffer2.close();
-    Assertions.assertEquals(3, pipelineManager2.getPipelines().size());
+    Assertions.assertEquals(4, pipelineManager2.getPipelines().size());
     
Assertions.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
 
     pipelineManager2.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to