This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0f15552d79 HDDS-8919. Allow EC pipelines to be created and then added
to PipelineManager in two steps (#4968)
0f15552d79 is described below
commit 0f15552d79a58fa4f0b3d675d06cacbc65ecb59b
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jun 26 22:04:15 2023 +0100
HDDS-8919. Allow EC pipelines to be created and then added to
PipelineManager in two steps (#4968)
---
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 7 ++
.../hdds/scm/pipeline/PipelineManagerImpl.java | 80 +++++++++++++++++++---
.../hdds/scm/pipeline/MockPipelineManager.java | 20 +++++-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 15 +++-
4 files changed, 109 insertions(+), 13 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 60b385049a..4f224285e6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -45,6 +45,13 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException;
+ Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes)
+ throws IOException, TimeoutException;
+
+ void addEcPipeline(Pipeline pipeline) throws IOException, TimeoutException;
+
Pipeline createPipeline(
ReplicationConfig replicationConfig,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index e7b6bf1f78..e234eb4115 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -195,6 +195,48 @@ public class PipelineManagerImpl implements
PipelineManager {
return pipelineManager;
}
+ /**
+ * Build a new pipeline and return it, but do not add it to the pipeline
+ * manager. This new pipeline will be in ALLOCATED state, but also
unavailable
+ * to clients in the system until it is added to the pipeline manager via the
+ * addPipeline method.
+ * @param replicationConfig
+ * @param excludedNodes
+ * @param favoredNodes
+ * @return The created pipeline.
+ * @throws IOException
+ */
+ @Override
+ public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException {
+ if (replicationConfig.getReplicationType() != ReplicationType.EC) {
+ throw new IllegalArgumentException("Replication type must be EC");
+ }
+ checkIfPipelineCreationIsAllowed(replicationConfig);
+ return pipelineFactory.create(replicationConfig, excludedNodes,
+ favoredNodes);
+ }
+
+ /**
+ * Add a previously built pipeline to the pipeline manager. This will allow
+ * the pipline to be used by clients in the system.
+ * @param pipeline
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ @Override
+ public void addEcPipeline(Pipeline pipeline)
+ throws IOException, TimeoutException {
+ if (pipeline.getReplicationConfig().getReplicationType()
+ != ReplicationType.EC) {
+ throw new IllegalArgumentException(
+ "Pipeline replication type must be EC");
+ }
+ checkIfPipelineCreationIsAllowed(pipeline.getReplicationConfig());
+ addPipelineToManager(pipeline);
+ }
+
@Override
public Pipeline createPipeline(ReplicationConfig replicationConfig)
throws IOException, TimeoutException {
@@ -206,6 +248,27 @@ public class PipelineManagerImpl implements
PipelineManager {
public Pipeline createPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException {
+ checkIfPipelineCreationIsAllowed(replicationConfig);
+
+ acquireWriteLock();
+ final Pipeline pipeline;
+ try {
+ try {
+ pipeline = pipelineFactory.create(replicationConfig,
+ excludedNodes, favoredNodes);
+ } catch (IOException e) {
+ metrics.incNumPipelineCreationFailed();
+ throw e;
+ }
+ addPipelineToManager(pipeline);
+ return pipeline;
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private void checkIfPipelineCreationIsAllowed(
+ ReplicationConfig replicationConfig) throws IOException {
if (!isPipelineCreationAllowed() && !factorOne(replicationConfig)) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
@@ -219,25 +282,24 @@ public class PipelineManagerImpl implements
PipelineManager {
LOG.info(message);
throw new IOException(message);
}
+ }
+ private void addPipelineToManager(Pipeline pipeline)
+ throws IOException, TimeoutException {
+ HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+ ClientVersion.CURRENT_VERSION);
acquireWriteLock();
- final Pipeline pipeline;
try {
- pipeline = pipelineFactory.create(replicationConfig,
- excludedNodes, favoredNodes);
- stateManager.addPipeline(pipeline.getProtobufMessage(
- ClientVersion.CURRENT_VERSION));
+ stateManager.addPipeline(pipelineProto);
} catch (IOException | TimeoutException ex) {
- LOG.debug("Failed to create pipeline with replicationConfig {}.",
- replicationConfig, ex);
+ LOG.debug("Failed to add pipeline {}.", pipeline, ex);
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
releaseWriteLock();
}
- LOG.info("Created pipeline {}.", pipeline);
+ LOG.info("Added pipeline {}.", pipeline);
recordMetricsForPipeline(pipeline);
- return pipeline;
}
private boolean factorOne(ReplicationConfig replicationConfig) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 5ca444785a..5303f44c26 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -71,8 +71,20 @@ public class MockPipelineManager implements PipelineManager {
public Pipeline createPipeline(ReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException, TimeoutException {
+ Pipeline pipeline = buildECPipeline(replicationConfig, excludedNodes,
+ favoredNodes);
+
+ stateManager.addPipeline(pipeline.getProtobufMessage(
+ ClientVersion.CURRENT_VERSION));
+ return pipeline;
+ }
+
+ @Override
+ public Pipeline buildECPipeline(ReplicationConfig replicationConfig,
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
+ throws IOException, TimeoutException {
final List<DatanodeDetails> nodes = Stream.generate(
- MockDatanodeDetails::randomDatanodeDetails)
+ MockDatanodeDetails::randomDatanodeDetails)
.limit(replicationConfig.getRequiredNodes())
.collect(Collectors.toList());
final Pipeline pipeline = Pipeline.newBuilder()
@@ -81,10 +93,14 @@ public class MockPipelineManager implements PipelineManager
{
.setNodes(nodes)
.setState(Pipeline.PipelineState.OPEN)
.build();
+ return pipeline;
+ }
+ @Override
+ public void addEcPipeline(Pipeline pipeline)
+ throws IOException, TimeoutException {
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
- return pipeline;
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 80d3515247..95ccf87f0b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -72,6 +72,7 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -194,6 +195,16 @@ public class TestPipelineManagerImpl {
RatisReplicationConfig.getInstance(ReplicationFactor.ONE));
Assertions.assertEquals(2, pipelineManager.getPipelines().size());
Assertions.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+
+ Pipeline builtPipeline = pipelineManager.buildECPipeline(
+ new ECReplicationConfig(3, 2),
+ Collections.emptyList(), Collections.emptyList());
+ pipelineManager.addEcPipeline(builtPipeline);
+
+ Assertions.assertEquals(3, pipelineManager.getPipelines().size());
+ Assertions.assertTrue(pipelineManager.containsPipeline(
+ builtPipeline.getId()));
+
buffer1.close();
pipelineManager.close();
@@ -203,11 +214,11 @@ public class TestPipelineManagerImpl {
createPipelineManager(true, buffer2);
// Should be able to load previous pipelines.
Assertions.assertFalse(pipelineManager2.getPipelines().isEmpty());
- Assertions.assertEquals(2, pipelineManager.getPipelines().size());
+ Assertions.assertEquals(3, pipelineManager.getPipelines().size());
Pipeline pipeline3 = pipelineManager2.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
buffer2.close();
- Assertions.assertEquals(3, pipelineManager2.getPipelines().size());
+ Assertions.assertEquals(4, pipelineManager2.getPipelines().size());
Assertions.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
pipelineManager2.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]