This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new c4aec7b  HDDS-6259. EC: Pipelines for closed containers should contain 
correct replica indexes (#3062)
c4aec7b is described below

commit c4aec7bb976867623c0f13c4795963b97855a109
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Feb 10 18:03:40 2022 +0000

    HDDS-6259. EC: Pipelines for closed containers should contain correct 
replica indexes (#3062)
---
 .../hdds/scm/pipeline/ECPipelineProvider.java      | 28 ++++++++++++--
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  8 ++++
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  5 +++
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  7 ++++
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |  6 +++
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 13 +++++++
 .../hdds/scm/pipeline/SimplePipelineProvider.java  | 12 ++++++
 .../hdds/scm/server/SCMClientProtocolServer.java   |  7 +---
 .../hdds/scm/pipeline/MockPipelineManager.java     | 23 ++++++++++++
 .../hdds/scm/pipeline/TestECPipelineProvider.java  | 41 +++++++++++++++++++++
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 43 ++++++++++++++++++++++
 .../scm/pipeline/TestRatisPipelineProvider.java    | 27 ++++++++++++++
 .../ozone/recon/scm/ReconPipelineFactory.java      |  9 +++++
 13 files changed, 221 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index 2c5caa4..11c23f1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -23,13 +23,16 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Class to create pipelines for EC containers.
@@ -84,12 +87,31 @@ public class ECPipelineProvider extends 
PipelineProvider<ECReplicationConfig> {
       ecIndex++;
     }
 
+    return createPipelineInternal(replicationConfig, nodes, dnIndexes);
+  }
+
+  @Override
+  public Pipeline createForRead(
+      ECReplicationConfig replicationConfig,
+      Set<ContainerReplica> replicas) {
+    Map<DatanodeDetails, Integer> map = new HashMap<>();
+    List<DatanodeDetails> dns = new ArrayList<>(replicas.size());
+
+    for (ContainerReplica r : replicas) {
+      map.put(r.getDatanodeDetails(), r.getReplicaIndex());
+      dns.add(r.getDatanodeDetails());
+    }
+    return createPipelineInternal(replicationConfig, dns, map);
+  }
+
+  private Pipeline createPipelineInternal(ECReplicationConfig repConfig,
+      List<DatanodeDetails> dns, Map<DatanodeDetails, Integer> indexes) {
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
         .setState(Pipeline.PipelineState.ALLOCATED)
-        .setReplicationConfig(replicationConfig)
-        .setNodes(nodes)
-        .setReplicaIndexes(dnIndexes)
+        .setReplicationConfig(repConfig)
+        .setNodes(dns)
+        .setReplicaIndexes(indexes)
         .build();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 444de35..780a4ee 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Creates pipeline based on replication type.
@@ -95,6 +97,12 @@ public class PipelineFactory {
         .create(replicationConfig, nodes);
   }
 
+  public Pipeline createForRead(ReplicationConfig replicationConfig,
+      Set<ContainerReplica> replicas) {
+    return providers.get(replicationConfig.getReplicationType())
+        .createForRead(replicationConfig, replicas);
+  }
+
   public void close(ReplicationType type, Pipeline pipeline)
       throws IOException {
     providers.get(type).close(pipeline);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index ce6e51a..6a50876 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -23,10 +23,12 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.utils.db.Table;
 
 /**
@@ -49,6 +51,9 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
       List<DatanodeDetails> nodes
   );
 
+  Pipeline createPipelineForRead(
+      ReplicationConfig replicationConfig, Set<ContainerReplica> replicas);
+
   Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
 
   boolean containsPipeline(PipelineID pipelineID);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 8b5487c..aeafe6f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -217,6 +218,12 @@ public class PipelineManagerImpl implements 
PipelineManager {
   }
 
   @Override
+  public Pipeline createPipelineForRead(
+      ReplicationConfig replicationConfig, Set<ContainerReplica> replicas) {
+    return pipelineFactory.createForRead(replicationConfig, replicas);
+  }
+
+  @Override
   public Pipeline getPipeline(PipelineID pipelineID)
       throws PipelineNotFoundException {
     return stateManager.getPipeline(pipelineID);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index b8c03eb..063ca76 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -74,6 +75,11 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
       List<DatanodeDetails> nodes
   );
 
+  protected abstract Pipeline createForRead(
+      REPLICATION_CONFIG replicationConfig,
+      Set<ContainerReplica> replicas
+  );
+
   protected abstract void close(Pipeline pipeline) throws IOException;
 
   protected abstract void shutdown();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 7baffb6..224bd70 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -28,6 +30,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -210,6 +213,16 @@ public class RatisPipelineProvider
   }
 
   @Override
+  public Pipeline createForRead(
+      RatisReplicationConfig replicationConfig,
+      Set<ContainerReplica> replicas) {
+    return create(replicationConfig, replicas
+        .stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList()));
+  }
+
+  @Override
   public void shutdown() {
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 0ab299f..a61ed27 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Implements Api for creating stand alone pipelines.
@@ -79,6 +82,15 @@ public class SimplePipelineProvider
   }
 
   @Override
+  public Pipeline createForRead(StandaloneReplicationConfig replicationConfig,
+      Set<ContainerReplica> replicas) {
+    return create(replicationConfig, replicas
+        .stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList()));
+  }
+
+  @Override
   public void close(Pipeline pipeline) throws IOException {
 
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index c95612e..72496a4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -256,12 +256,9 @@ public class SCMClientProtocolServer implements
     }
 
     if (pipeline == null) {
-      pipeline = scm.getPipelineManager().createPipeline(
+      pipeline = scm.getPipelineManager().createPipelineForRead(
           container.getReplicationConfig(),
-          scm.getContainerManager()
-              .getContainerReplicas(cid).stream()
-              .map(ContainerReplica::getDatanodeDetails)
-              .collect(Collectors.toList()));
+          scm.getContainerManager().getContainerReplicas(cid));
     }
 
     return new ContainerWithPipeline(container, pipeline);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 2884145..00d96dc 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -30,11 +31,14 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.ClientVersions;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -94,6 +98,25 @@ public class MockPipelineManager implements PipelineManager {
   }
 
   @Override
+  public Pipeline createPipelineForRead(
+      final ReplicationConfig replicationConfig,
+      final Set<ContainerReplica> replicas) {
+    List<DatanodeDetails> dns = new ArrayList<>();
+    Map<DatanodeDetails, Integer> map = new HashMap<>();
+    for (ContainerReplica r : replicas) {
+      map.put(r.getDatanodeDetails(), r.getReplicaIndex());
+      dns.add(r.getDatanodeDetails());
+    }
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setReplicationConfig(replicationConfig)
+        .setNodes(dns)
+        .setReplicaIndexes(map)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .build();
+  }
+
+  @Override
   public Pipeline getPipeline(final PipelineID pipelineID)
       throws PipelineNotFoundException {
     return stateManager.getPipeline(pipelineID);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
index 7373b6c..d95ef85 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -23,8 +23,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,7 +36,10 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
@@ -91,6 +97,22 @@ public class TestECPipelineProvider {
   }
 
   @Test
+  public void testPipelineForReadCanBeCreated() {
+    ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+
+    Set<ContainerReplica> replicas = createContainerReplicas(4);
+    Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+    Assert.assertEquals(EC, pipeline.getType());
+    Assert.assertEquals(4, pipeline.getNodes().size());
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    for (ContainerReplica r : replicas) {
+      Assert.assertEquals(r.getReplicaIndex(),
+          pipeline.getReplicaIndex(r.getDatanodeDetails()));
+    }
+  }
+
+  @Test
   public void testExcludedAndFavoredNodesPassedToPlacementPolicy()
       throws IOException {
     ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
@@ -110,4 +132,23 @@ public class TestECPipelineProvider {
         ecConf.getRequiredNodes(), 0, containerSizeBytes);
   }
 
+  private Set<ContainerReplica> createContainerReplicas(int number) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (int i = 0; i < number; i++) {
+      ContainerReplica r = ContainerReplica.newBuilder()
+          .setBytesUsed(1)
+          .setContainerID(ContainerID.valueOf(1))
+          .setContainerState(StorageContainerDatanodeProtocolProtos
+              .ContainerReplicaProto.State.CLOSED)
+          .setKeyCount(1)
+          .setOriginNodeId(UUID.randomUUID())
+          .setSequenceId(1)
+          .setReplicaIndex(i + 1)
+          .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+          .build();
+      replicas.add(r);
+    }
+    return replicas;
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 01020be..3152c39 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -25,10 +25,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -57,9 +60,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
@@ -709,6 +715,43 @@ public class TestPipelineManagerImpl {
     assertTrue(containerLogIdx < pipelineLogIdx);
   }
 
+  @Test
+  public void testCreatePipelineForRead() throws IOException {
+    PipelineManager pipelineManager = createPipelineManager(true);
+    List<DatanodeDetails> dns = nodeManager
+        .getNodes(NodeStatus.inServiceHealthy())
+        .stream()
+        .limit(3)
+        .collect(Collectors.toList());
+    Set<ContainerReplica> replicas = createContainerReplicasList(dns);
+    Pipeline pipeline = pipelineManager.createPipelineForRead(
+        new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
+    Assert.assertEquals(3, pipeline.getNodes().size());
+    for (DatanodeDetails dn : pipeline.getNodes())  {
+      Assert.assertTrue(dns.contains(dn));
+    }
+  }
+
+  private Set<ContainerReplica> createContainerReplicasList(
+      List <DatanodeDetails> dns) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (DatanodeDetails dn : dns) {
+      ContainerReplica r = ContainerReplica.newBuilder()
+          .setBytesUsed(1)
+          .setContainerID(ContainerID.valueOf(1))
+          .setContainerState(StorageContainerDatanodeProtocolProtos
+              .ContainerReplicaProto.State.CLOSED)
+          .setKeyCount(1)
+          .setOriginNodeId(UUID.randomUUID())
+          .setSequenceId(1)
+          .setReplicaIndex(0)
+          .setDatanodeDetails(dn)
+          .build();
+      replicas.add(r);
+    }
+    return replicas;
+  }
+
   private void sendPipelineReport(
       DatanodeDetails dn, Pipeline pipeline,
       PipelineReportHandler pipelineReportHandler,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 388e707..d8a44f5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -27,7 +27,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -217,13 +220,17 @@ public class TestRatisPipelineProvider {
     List<DatanodeDetails> healthyNodes = nodeManager
         .getNodes(NodeStatus.inServiceHealthy()).stream()
         .limit(3).collect(Collectors.toList());
+    Set<ContainerReplica> replicas = createContainerReplicas(healthyNodes);
 
     Pipeline pipeline1 = provider.create(
         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
     Pipeline pipeline2 = provider.create(
         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+    Pipeline pipeline3 = provider.createForRead(
+        new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
 
     Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
+    Assert.assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet());
     cleanup();
   }
 
@@ -358,4 +365,24 @@ public class TestRatisPipelineProvider {
     stateManager.addPipeline(pipelineProto);
     nodeManager.addPipeline(openPipeline);
   }
+
+  private Set<ContainerReplica> createContainerReplicas(
+      List<DatanodeDetails> dns) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (DatanodeDetails dn : dns) {
+      ContainerReplica r = ContainerReplica.newBuilder()
+          .setBytesUsed(1)
+          .setContainerID(ContainerID.valueOf(1))
+          .setContainerState(StorageContainerDatanodeProtocolProtos
+              .ContainerReplicaProto.State.CLOSED)
+          .setKeyCount(1)
+          .setOriginNodeId(UUID.randomUUID())
+          .setSequenceId(1)
+          .setReplicaIndex(0)
+          .setDatanodeDetails(dn)
+          .build();
+      replicas.add(r);
+    }
+    return replicas;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 885f3f3..f84f31a 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.ozone.recon.scm;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.collections.map.DefaultedMap;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
@@ -66,6 +68,13 @@ public class ReconPipelineFactory extends PipelineFactory {
       throw new UnsupportedOperationException(
           "Trying to create pipeline in Recon, which is prohibited!");
     }
+    
+    @Override
+    public Pipeline createForRead(ReplicationConfig config,
+        Set<ContainerReplica> replicas) {
+      throw new UnsupportedOperationException(
+          "Trying to create pipeline in Recon, which is prohibited!");
+    }
 
     @Override
     public void close(Pipeline pipeline) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to