This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new c4aec7b HDDS-6259. EC: Pipelines for closed containers should contain
correct replica indexes (#3062)
c4aec7b is described below
commit c4aec7bb976867623c0f13c4795963b97855a109
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Feb 10 18:03:40 2022 +0000
HDDS-6259. EC: Pipelines for closed containers should contain correct
replica indexes (#3062)
---
.../hdds/scm/pipeline/ECPipelineProvider.java | 28 ++++++++++++--
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 8 ++++
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 +++
.../hdds/scm/pipeline/PipelineManagerImpl.java | 7 ++++
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 6 +++
.../hdds/scm/pipeline/RatisPipelineProvider.java | 13 +++++++
.../hdds/scm/pipeline/SimplePipelineProvider.java | 12 ++++++
.../hdds/scm/server/SCMClientProtocolServer.java | 7 +---
.../hdds/scm/pipeline/MockPipelineManager.java | 23 ++++++++++++
.../hdds/scm/pipeline/TestECPipelineProvider.java | 41 +++++++++++++++++++++
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 43 ++++++++++++++++++++++
.../scm/pipeline/TestRatisPipelineProvider.java | 27 ++++++++++++++
.../ozone/recon/scm/ReconPipelineFactory.java | 9 +++++
13 files changed, 221 insertions(+), 8 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index 2c5caa4..11c23f1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -23,13 +23,16 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Class to create pipelines for EC containers.
@@ -84,12 +87,31 @@ public class ECPipelineProvider extends
PipelineProvider<ECReplicationConfig> {
ecIndex++;
}
+ return createPipelineInternal(replicationConfig, nodes, dnIndexes);
+ }
+
+ @Override
+ public Pipeline createForRead(
+ ECReplicationConfig replicationConfig,
+ Set<ContainerReplica> replicas) {
+ Map<DatanodeDetails, Integer> map = new HashMap<>();
+ List<DatanodeDetails> dns = new ArrayList<>(replicas.size());
+
+ for (ContainerReplica r : replicas) {
+ map.put(r.getDatanodeDetails(), r.getReplicaIndex());
+ dns.add(r.getDatanodeDetails());
+ }
+ return createPipelineInternal(replicationConfig, dns, map);
+ }
+
+ private Pipeline createPipelineInternal(ECReplicationConfig repConfig,
+ List<DatanodeDetails> dns, Map<DatanodeDetails, Integer> indexes) {
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.ALLOCATED)
- .setReplicationConfig(replicationConfig)
- .setNodes(nodes)
- .setReplicaIndexes(dnIndexes)
+ .setReplicationConfig(repConfig)
+ .setNodes(dns)
+ .setReplicaIndexes(indexes)
.build();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 444de35..780a4ee 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Creates pipeline based on replication type.
@@ -95,6 +97,12 @@ public class PipelineFactory {
.create(replicationConfig, nodes);
}
+ public Pipeline createForRead(ReplicationConfig replicationConfig,
+ Set<ContainerReplica> replicas) {
+ return providers.get(replicationConfig.getReplicationType())
+ .createForRead(replicationConfig, replicas);
+ }
+
public void close(ReplicationType type, Pipeline pipeline)
throws IOException {
providers.get(type).close(pipeline);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index ce6e51a..6a50876 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -23,10 +23,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.utils.db.Table;
/**
@@ -49,6 +51,9 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
List<DatanodeDetails> nodes
);
+ Pipeline createPipelineForRead(
+ ReplicationConfig replicationConfig, Set<ContainerReplica> replicas);
+
Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
boolean containsPipeline(PipelineID pipelineID);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 8b5487c..aeafe6f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -31,6 +31,7 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -217,6 +218,12 @@ public class PipelineManagerImpl implements
PipelineManager {
}
@Override
+ public Pipeline createPipelineForRead(
+ ReplicationConfig replicationConfig, Set<ContainerReplica> replicas) {
+ return pipelineFactory.createForRead(replicationConfig, replicas);
+ }
+
+ @Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
return stateManager.getPipeline(pipelineID);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index b8c03eb..063ca76 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -74,6 +75,11 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
List<DatanodeDetails> nodes
);
+ protected abstract Pipeline createForRead(
+ REPLICATION_CONFIG replicationConfig,
+ Set<ContainerReplica> replicas
+ );
+
protected abstract void close(Pipeline pipeline) throws IOException;
protected abstract void shutdown();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 7baffb6..224bd70 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -28,6 +30,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -210,6 +213,16 @@ public class RatisPipelineProvider
}
@Override
+ public Pipeline createForRead(
+ RatisReplicationConfig replicationConfig,
+ Set<ContainerReplica> replicas) {
+ return create(replicationConfig, replicas
+ .stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
public void shutdown() {
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 0ab299f..a61ed27 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Implements Api for creating stand alone pipelines.
@@ -79,6 +82,15 @@ public class SimplePipelineProvider
}
@Override
+ public Pipeline createForRead(StandaloneReplicationConfig replicationConfig,
+ Set<ContainerReplica> replicas) {
+ return create(replicationConfig, replicas
+ .stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
public void close(Pipeline pipeline) throws IOException {
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index c95612e..72496a4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -256,12 +256,9 @@ public class SCMClientProtocolServer implements
}
if (pipeline == null) {
- pipeline = scm.getPipelineManager().createPipeline(
+ pipeline = scm.getPipelineManager().createPipelineForRead(
container.getReplicationConfig(),
- scm.getContainerManager()
- .getContainerReplicas(cid).stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList()));
+ scm.getContainerManager().getContainerReplicas(cid));
}
return new ContainerWithPipeline(container, pipeline);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 2884145..00d96dc 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -30,11 +31,14 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientVersions;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -94,6 +98,25 @@ public class MockPipelineManager implements PipelineManager {
}
@Override
+ public Pipeline createPipelineForRead(
+ final ReplicationConfig replicationConfig,
+ final Set<ContainerReplica> replicas) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ Map<DatanodeDetails, Integer> map = new HashMap<>();
+ for (ContainerReplica r : replicas) {
+ map.put(r.getDatanodeDetails(), r.getReplicaIndex());
+ dns.add(r.getDatanodeDetails());
+ }
+ return Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setReplicationConfig(replicationConfig)
+ .setNodes(dns)
+ .setReplicaIndexes(map)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .build();
+ }
+
+ @Override
public Pipeline getPipeline(final PipelineID pipelineID)
throws PipelineNotFoundException {
return stateManager.getPipeline(pipelineID);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
index 7373b6c..d95ef85 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -23,8 +23,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.junit.Assert;
import org.junit.Before;
@@ -33,7 +36,10 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
@@ -91,6 +97,22 @@ public class TestECPipelineProvider {
}
@Test
+ public void testPipelineForReadCanBeCreated() {
+ ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+
+ Set<ContainerReplica> replicas = createContainerReplicas(4);
+ Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+ Assert.assertEquals(EC, pipeline.getType());
+ Assert.assertEquals(4, pipeline.getNodes().size());
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ for (ContainerReplica r : replicas) {
+ Assert.assertEquals(r.getReplicaIndex(),
+ pipeline.getReplicaIndex(r.getDatanodeDetails()));
+ }
+ }
+
+ @Test
public void testExcludedAndFavoredNodesPassedToPlacementPolicy()
throws IOException {
ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
@@ -110,4 +132,23 @@ public class TestECPipelineProvider {
ecConf.getRequiredNodes(), 0, containerSizeBytes);
}
+ private Set<ContainerReplica> createContainerReplicas(int number) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (int i = 0; i < number; i++) {
+ ContainerReplica r = ContainerReplica.newBuilder()
+ .setBytesUsed(1)
+ .setContainerID(ContainerID.valueOf(1))
+ .setContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(1)
+ .setOriginNodeId(UUID.randomUUID())
+ .setSequenceId(1)
+ .setReplicaIndex(i + 1)
+ .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+ .build();
+ replicas.add(r);
+ }
+ return replicas;
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 01020be..3152c39 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -25,10 +25,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -57,9 +60,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
@@ -709,6 +715,43 @@ public class TestPipelineManagerImpl {
assertTrue(containerLogIdx < pipelineLogIdx);
}
+ @Test
+ public void testCreatePipelineForRead() throws IOException {
+ PipelineManager pipelineManager = createPipelineManager(true);
+ List<DatanodeDetails> dns = nodeManager
+ .getNodes(NodeStatus.inServiceHealthy())
+ .stream()
+ .limit(3)
+ .collect(Collectors.toList());
+ Set<ContainerReplica> replicas = createContainerReplicasList(dns);
+ Pipeline pipeline = pipelineManager.createPipelineForRead(
+ new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
+ Assert.assertEquals(3, pipeline.getNodes().size());
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ Assert.assertTrue(dns.contains(dn));
+ }
+ }
+
+ private Set<ContainerReplica> createContainerReplicasList(
+ List <DatanodeDetails> dns) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (DatanodeDetails dn : dns) {
+ ContainerReplica r = ContainerReplica.newBuilder()
+ .setBytesUsed(1)
+ .setContainerID(ContainerID.valueOf(1))
+ .setContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(1)
+ .setOriginNodeId(UUID.randomUUID())
+ .setSequenceId(1)
+ .setReplicaIndex(0)
+ .setDatanodeDetails(dn)
+ .build();
+ replicas.add(r);
+ }
+ return replicas;
+ }
+
private void sendPipelineReport(
DatanodeDetails dn, Pipeline pipeline,
PipelineReportHandler pipelineReportHandler,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 388e707..d8a44f5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -27,7 +27,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -217,13 +220,17 @@ public class TestRatisPipelineProvider {
List<DatanodeDetails> healthyNodes = nodeManager
.getNodes(NodeStatus.inServiceHealthy()).stream()
.limit(3).collect(Collectors.toList());
+ Set<ContainerReplica> replicas = createContainerReplicas(healthyNodes);
Pipeline pipeline1 = provider.create(
new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
Pipeline pipeline2 = provider.create(
new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+ Pipeline pipeline3 = provider.createForRead(
+ new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
+ Assert.assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet());
cleanup();
}
@@ -358,4 +365,24 @@ public class TestRatisPipelineProvider {
stateManager.addPipeline(pipelineProto);
nodeManager.addPipeline(openPipeline);
}
+
+ private Set<ContainerReplica> createContainerReplicas(
+ List<DatanodeDetails> dns) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (DatanodeDetails dn : dns) {
+ ContainerReplica r = ContainerReplica.newBuilder()
+ .setBytesUsed(1)
+ .setContainerID(ContainerID.valueOf(1))
+ .setContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(1)
+ .setOriginNodeId(UUID.randomUUID())
+ .setSequenceId(1)
+ .setReplicaIndex(0)
+ .setDatanodeDetails(dn)
+ .build();
+ replicas.add(r);
+ }
+ return replicas;
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 885f3f3..f84f31a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.ozone.recon.scm;
import java.util.List;
+import java.util.Set;
import org.apache.commons.collections.map.DefaultedMap;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
@@ -66,6 +68,13 @@ public class ReconPipelineFactory extends PipelineFactory {
throw new UnsupportedOperationException(
"Trying to create pipeline in Recon, which is prohibited!");
}
+
+ @Override
+ public Pipeline createForRead(ReplicationConfig config,
+ Set<ContainerReplica> replicas) {
+ throw new UnsupportedOperationException(
+ "Trying to create pipeline in Recon, which is prohibited!");
+ }
@Override
public void close(Pipeline pipeline) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]