This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b3a2a3e HDDS-5441. Disallow same set of DNs to be part of multiple
pipelines. (#2416)
b3a2a3e is described below
commit b3a2a3eb856c30436b835e18bf6d5ceb88a64771
Author: bshashikant <[email protected]>
AuthorDate: Wed Aug 4 13:28:36 2021 +0530
HDDS-5441. Disallow same set of DNs to be part of multiple pipelines.
(#2416)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 +
.../common/src/main/resources/ozone-default.xml | 8 +
.../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 14 ++
.../java/org/apache/hadoop/hdds/scm/ScmUtils.java | 10 ++
.../apache/hadoop/hdds/scm/node/NodeManager.java | 9 ++
.../hadoop/hdds/scm/node/SCMNodeManager.java | 27 ++++
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 6 +
.../hdds/scm/pipeline/TestMultiRaftSetup.java | 173 +++++++++++++++++++++
8 files changed, 252 insertions(+)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 8d0aab3..0394591 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -356,6 +356,11 @@ public final class ScmConfigKeys {
"ozone.scm.datanode.pipeline.limit";
public static final int OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT = 2;
+ public static final String OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS =
+ "ozone.scm.datanode.disallow.same.peers";
+ public static final boolean OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT =
+ false;
+
// Upper limit for how many pipelines can be created
// across the cluster nodes managed by SCM.
// Only for test purpose now.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9abfc1a..5e480ae 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -844,6 +844,14 @@
</description>
</property>
<property>
+ <name>ozone.scm.datanode.disallow.same.peers</name>
+ <value>false</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>Disallows same set of datanodes to participate in multiple
+ pipelines when set to true. Default is set to false.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.ratis.pipeline.limit</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index bec0db3..415a8e4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -50,6 +51,7 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
private final NodeManager nodeManager;
private final Random rand;
private final ConfigurationSource conf;
+ private final boolean shouldRemovePeers;
/**
* Return for replication factor 1 containers where the placement policy
@@ -73,6 +75,7 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
+ this.shouldRemovePeers = ScmUtils.shouldRemovePeers(conf);
}
/**
@@ -236,6 +239,7 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
// invoke the choose function defined in the derived classes.
DatanodeDetails nodeId = chooseNode(healthyNodes);
if (nodeId != null) {
+ removePeers(nodeId, healthyNodes);
results.add(nodeId);
}
}
@@ -314,4 +318,14 @@ public abstract class SCMCommonPlacementPolicy implements
PlacementPolicy {
return new ContainerPlacementStatusDefault(
(int)currentRackCount, requiredRacks, numRacks);
}
+
+ /**
+ * Removes the datanode peers from all the existing pipelines for this dn.
+ */
+ public void removePeers(DatanodeDetails dn,
+ List<DatanodeDetails> healthyList) {
+ if (shouldRemovePeers) {
+ healthyList.removeAll(nodeManager.getPeerList(dn));
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index fa4bfcb..4ab9ec2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -181,4 +181,14 @@ public final class ScmUtils {
"nodeId. If want to configure same port configure {}", confKey,
portKey, portKey);
}
+
+ public static boolean shouldRemovePeers(final ConfigurationSource conf) {
+ int pipelineLimitPerDn =
+ conf.getInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+ return (1 != pipelineLimitPerDn && conf
+ .getBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
+ ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS_DEFAULT));
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index a934d03..43008cd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.Collection;
/**
* A node manager supports a simple interface for managing a datanode.
@@ -307,6 +308,14 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
int minPipelineLimit(List<DatanodeDetails> dn);
+ /**
+ * Gets the peers in all the pipelines for the particular datnode.
+ * @param dn datanode
+ */
+ default Collection<DatanodeDetails> getPeerList(DatanodeDetails dn) {
+ return null;
+ }
+
default HDDSLayoutVersionManager getLayoutVersionManager(){
return null;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c045d07..c3db8d7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -31,6 +31,8 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Collections;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
@@ -64,6 +66,8 @@ import
org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -885,6 +889,29 @@ public class SCMNodeManager implements NodeManager {
return Collections.min(pipelineCountList);
}
+ @Override
+ public Collection<DatanodeDetails> getPeerList(DatanodeDetails dn) {
+ HashSet<DatanodeDetails> dns = new HashSet<>();
+ Preconditions.checkNotNull(dn);
+ Set<PipelineID> pipelines =
+ nodeStateManager.getPipelineByDnID(dn.getUuid());
+ PipelineManager pipelineManager = scmContext.getScm().getPipelineManager();
+ if (!pipelines.isEmpty()) {
+ pipelines.forEach(id -> {
+ try {
+ Pipeline pipeline = pipelineManager.getPipeline(id);
+ List<DatanodeDetails> peers = pipeline.getNodes();
+ dns.addAll(peers);
+ } catch (PipelineNotFoundException pnfe) {
+ //ignore the pipeline not found exception here
+ }
+ });
+ }
+ // renove self node from the set
+ dns.remove(dn);
+ return dns;
+ }
+
/**
* Get set of pipelines a datanode is part of.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index d330df6..942d04c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -290,6 +290,7 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor != null) {
results.add(anchor);
+ removePeers(anchor, healthyNodes);
exclude.add(anchor);
} else {
LOG.warn("Unable to find healthy node for anchor(first) node.");
@@ -309,6 +310,7 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
// Rack awareness is detected.
rackAwareness = true;
results.add(nextNode);
+ removePeers(nextNode, healthyNodes);
exclude.add(nextNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Second node chosen: {}", nextNode);
@@ -339,6 +341,7 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
if (pick != null) {
results.add(pick);
+ removePeers(pick, healthyNodes);
exclude.add(pick);
LOG.debug("Remaining node chosen: {}", pick);
} else {
@@ -376,6 +379,9 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
DatanodeDetails selectedNode =
healthyNodes.get(getRand().nextInt(healthyNodes.size()));
healthyNodes.remove(selectedNode);
+ if (selectedNode != null) {
+ removePeers(selectedNode, healthyNodes);
+ }
return selectedNode;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
new file mode 100644
index 0000000..84d025c
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestMultiRaftSetup.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+
+import org.apache.ozone.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for MultiRaft set up.
+ */
+public class TestMultiRaftSetup {
+
+ private MiniOzoneCluster cluster;
+ private StorageContainerManager scm;
+ private NodeManager nodeManager;
+ private PipelineManager pipelineManager;
+
+ private long pipelineDestroyTimeoutInMillis;
+ private static final ReplicationConfig RATIS_THREE =
+ ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ public void init(int dnCount, OzoneConfiguration conf) throws Exception {
+ cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(dnCount).build();
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
+ TimeUnit.MILLISECONDS);
+ pipelineDestroyTimeoutInMillis = 1000;
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+ pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
+ cluster.waitForClusterToBeReady();
+ scm = cluster.getStorageContainerManager();
+ nodeManager = scm.getScmNodeManager();
+ pipelineManager = scm.getPipelineManager();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultiRaftSamePeers() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
+ false);
+ init(3, conf);
+ waitForPipelineCreated(2);
+ Assert.assertEquals(2, pipelineManager.getPipelines(ReplicationConfig
+ .fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+ ReplicationFactor.THREE)).size());
+ assertNotSamePeers();
+ shutdown();
+ }
+
+ @Test
+ public void testMultiRaftNotSamePeers() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
true);
+ init(3, conf);
+ waitForPipelineCreated(1);
+ // datanode pipeline limit is set to 2, but only one set of 3 pipelines
+ // will be created. Further pipeline creation should fail
+ Assert.assertEquals(1, pipelineManager.getPipelines(RATIS_THREE).size());
+ try {
+ pipelineManager.createPipeline(RATIS_THREE);
+ Assert.fail();
+ } catch (IOException ex) {
+ }
+ shutdown();
+ }
+
+ @Test
+ public void testMultiRaft() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_DATANODE_DISALLOW_SAME_PEERS,
true);
+ init(5, conf);
+ waitForPipelineCreated(2);
+ // datanode pipeline limit is set to 2, but only two Ratis THREE pipeline
+ // will be created. Further pipeline creation should fail.
+ // For example, with d1,d2, d3, d4, d5, only d1 d2 d3 and d1 d4 d5 can form
+ // pipeline as the none of peers from any of existing pipelines will be
+ // repeated
+ Assert.assertEquals(2, pipelineManager.getPipelines(RATIS_THREE).size());
+ List<DatanodeDetails> dns = nodeManager.getAllNodes().stream()
+ .filter((dn) -> nodeManager.getPipelinesCount(dn) > 2).collect(
+ Collectors.toList());
+ Assert.assertEquals(1, dns.size());
+ try {
+ pipelineManager.createPipeline(RATIS_THREE);
+ Assert.fail();
+ } catch (IOException ex) {
+ }
+ Collection<PipelineID> pipelineIds = nodeManager.getPipelines(dns.get(0));
+ // Only one dataode should have 3 pipelines in total, 1 RATIS ONE pipeline
+ // and 2 RATIS 3 pipeline
+ Assert.assertEquals(3, pipelineIds.size());
+ List<Pipeline> pipelines = new ArrayList<>();
+ pipelineIds.forEach((id) -> {
+ try {
+ pipelines.add(pipelineManager.getPipeline(id));
+ } catch (PipelineNotFoundException pnfe) {
+ }
+ });
+ Assert.assertEquals(1, pipelines.stream()
+ .filter((p) -> (p.getReplicationConfig().getRequiredNodes() == 1))
+ .collect(Collectors.toList()).size());
+ Assert.assertEquals(2, pipelines.stream()
+ .filter((p) -> (p.getReplicationConfig().getRequiredNodes() == 3))
+ .collect(Collectors.toList()).size());
+ shutdown();
+ }
+ private void assertNotSamePeers() {
+ nodeManager.getAllNodes().forEach((dn) ->{
+ Collection<DatanodeDetails> peers = nodeManager.getPeerList(dn);
+ Assert.assertFalse(peers.contains(dn));
+ List<DatanodeDetails> trimList = nodeManager.getAllNodes();
+ trimList.remove(dn);
+ Assert.assertTrue(peers.containsAll(trimList));
+ });
+ }
+
+ private void waitForPipelineCreated(int num) throws Exception {
+ LambdaTestUtils.await(10000, 500, () -> {
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(RATIS_THREE);
+ return pipelines.size() == num;
+ });
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]