This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6eed1ec3d3 HDDS-7265. ScatterRackPolicy for Ratis pipeline provider
container placement (#4002)
6eed1ec3d3 is described below
commit 6eed1ec3d3e68e30861964a2dff042bd9b37be0b
Author: Neil Joshi <[email protected]>
AuthorDate: Fri Dec 9 15:06:50 2022 -0700
HDDS-7265. ScatterRackPolicy for Ratis pipeline provider container
placement (#4002)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 2 +
.../SCMContainerPlacementRackScatter.java | 26 ++-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 20 +-
.../pipeline/PipelinePlacementPolicyFactory.java | 52 +++++
.../hdds/scm/pipeline/RatisPipelineProvider.java | 34 +++-
.../TestSCMContainerPlacementRackScatter.java | 40 ++++
.../scm/pipeline/TestPipelinePlacementFactory.java | 215 +++++++++++++++++++++
.../scm/pipeline/TestPipelinePlacementPolicy.java | 15 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 22 ++-
.../hadoop/ozone/TestOzoneConfigurationFields.java | 3 +-
10 files changed, 410 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1c4e09ead9..c324c3e841 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -344,6 +344,8 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl";
+ public static final String OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY =
+ "ozone.scm.pipeline.placement.impl";
public static final String OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY =
"ozone.scm.container.placement.ec.impl";
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index 2f86df8708..6b4f9d9116 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,19 @@ public final class SCMContainerPlacementRackScatter
this.metrics = metrics;
}
+ /**
+ * Constructor for Pipeline Provider Pipeline Placement with rack awareness.
+ * @param nodeManager Node Manager
+ * @param stateManager State Manager
+ * @param conf Configuration
+ */
+ public SCMContainerPlacementRackScatter(NodeManager nodeManager,
+ PipelineStateManager stateManager, ConfigurationSource conf) {
+ super(nodeManager, conf);
+ this.networkTopology = nodeManager.getClusterNetworkTopologyMap();
+ this.metrics = null;
+ }
+
public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
List<Node> unavailableNodes,
List<DatanodeDetails> mutableFavoredNodes,
@@ -205,7 +219,9 @@ public final class SCMContainerPlacementRackScatter
"than 0, but the given num is " + nodesRequiredToChoose;
throw new SCMException(errorMsg, null);
}
- metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+ if (metrics != null) {
+ metrics.incrDatanodeRequestCount(nodesRequiredToChoose);
+ }
int nodesRequired = nodesRequiredToChoose;
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
List<Node> availableNodes = networkTopology.getNodes(
@@ -363,7 +379,9 @@ public final class SCMContainerPlacementRackScatter
long metadataSizeRequired, long dataSizeRequired) {
int maxRetry = INNER_LOOP_MAX_RETRY;
while (true) {
- metrics.incrDatanodeChooseAttemptCount();
+ if (metrics != null) {
+ metrics.incrDatanodeChooseAttemptCount();
+ }
Node node = null;
try {
node = networkTopology.chooseRandom(scope, excludedNodes);
@@ -379,7 +397,9 @@ public final class SCMContainerPlacementRackScatter
DatanodeDetails datanodeDetails = (DatanodeDetails) node;
if (isValidNode(datanodeDetails, metadataSizeRequired,
dataSizeRequired)) {
- metrics.incrDatanodeChooseSuccessCount();
+ if (metrics != null) {
+ metrics.incrDatanodeChooseSuccessCount();
+ }
return node;
}
// exclude the unavailable node for the following retries.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 2287475d82..80374299b6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -83,7 +83,10 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
}
- int currentRatisThreePipelineCount(DatanodeDetails datanodeDetails) {
+ public static int currentRatisThreePipelineCount(
+ NodeManager nodeManager,
+ PipelineStateManager stateManager,
+ DatanodeDetails datanodeDetails) {
// Safe to cast collection's size to int
return (int) nodeManager.getPipelines(datanodeDetails).stream()
.map(id -> {
@@ -95,12 +98,12 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
return null;
}
})
- .filter(this::isNonClosedRatisThreePipeline)
+ .filter(PipelinePlacementPolicy::isNonClosedRatisThreePipeline)
.count();
}
- private boolean isNonClosedRatisThreePipeline(Pipeline p) {
- return p.getReplicationConfig()
+ private static boolean isNonClosedRatisThreePipeline(Pipeline p) {
+ return p != null && p.getReplicationConfig()
.equals(RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
&& !p.isClosed();
}
@@ -163,7 +166,8 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
// TODO check if sorting could cause performance issue: HDDS-3466.
List<DatanodeDetails> healthyList = healthyNodes.stream()
.map(d ->
- new DnWithPipelines(d, currentRatisThreePipelineCount(d)))
+ new DnWithPipelines(d, currentRatisThreePipelineCount(nodeManager,
+ stateManager, d)))
.filter(d ->
(d.getPipelines() < nodeManager.pipelineLimit(d.getDn())))
.sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
@@ -478,7 +482,11 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
return REQUIRED_RACKS;
}
- private static class DnWithPipelines {
+ /**
+ * static inner utility class for datanodes with pipeline, used for
+ * pipeline engagement checking.
+ */
+ public static class DnWithPipelines {
private DatanodeDetails dn;
private int pipelines;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
new file mode 100644
index 0000000000..c57448fdab
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicyFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
+
+/**
+ * Pipeline placement factor for pipeline providers to create placement
instance
+ * based on configuration property.
+ * {@link ScmConfigKeys#OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY}
+ */
+public final class PipelinePlacementPolicyFactory {
+
+ private PipelinePlacementPolicyFactory() {
+ }
+
+ public static PlacementPolicy getPolicy(NodeManager nodeManager,
+ PipelineStateManager stateManager, ConfigurationSource conf) {
+ final Class<? extends PlacementPolicy> clazz
+ = conf.getClass(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ PipelinePlacementPolicy.class, PlacementPolicy.class);
+
+ try {
+ return clazz.getDeclaredConstructor(NodeManager.class,
+ PipelineStateManager.class, ConfigurationSource.class)
+ .newInstance(nodeManager, stateManager, conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to getPolicy for " + clazz, e);
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 43b2e01c91..ad149fdec6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import
org.apache.hadoop.hdds.scm.pipeline.PipelinePlacementPolicy.DnWithPipelines;
import
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy;
import
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -59,7 +62,7 @@ public class RatisPipelineProvider
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
- private final PipelinePlacementPolicy placementPolicy;
+ private final PlacementPolicy placementPolicy;
private int pipelineNumberLimit;
private int maxPipelinePerDatanode;
private final LeaderChoosePolicy leaderChoosePolicy;
@@ -77,8 +80,8 @@ public class RatisPipelineProvider
this.conf = conf;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
- this.placementPolicy =
- new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+ this.placementPolicy = PipelinePlacementPolicyFactory
+ .getPolicy(nodeManager, stateManager, conf);
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
@@ -163,6 +166,14 @@ public class RatisPipelineProvider
containerSizeBytes);
break;
case THREE:
+ List<DatanodeDetails> excludeDueToEngagement =
filterPipelineEngagement();
+ if (excludeDueToEngagement.size() > 0) {
+ if (excludedNodes.size() == 0) {
+ excludedNodes = excludeDueToEngagement;
+ } else {
+ excludedNodes.addAll(excludeDueToEngagement);
+ }
+ }
dns = placementPolicy.chooseDatanodes(excludedNodes,
favoredNodes, factor.getNumber(), minRatisVolumeSizeBytes,
containerSizeBytes);
@@ -222,6 +233,23 @@ public class RatisPipelineProvider
.collect(Collectors.toList()));
}
+ private List<DatanodeDetails> filterPipelineEngagement() {
+ List<DatanodeDetails> healthyNodes =
+ getNodeManager().getNodes(NodeStatus.inServiceHealthy());
+ List<DatanodeDetails> excluded = healthyNodes.stream()
+ .map(d ->
+ new DnWithPipelines(d,
+ PipelinePlacementPolicy
+ .currentRatisThreePipelineCount(getNodeManager(),
+ getPipelineStateManager(), d)))
+ .filter(d ->
+ (d.getPipelines() >= getNodeManager().pipelineLimit(d.getDn())))
+ .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
+ .map(d -> d.getDn())
+ .collect(Collectors.toList());
+ return excluded;
+ }
+
@Override
public void shutdown() {
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
index b49037e583..815c70ffb3 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
@@ -556,6 +557,45 @@ public class TestSCMContainerPlacementRackScatter {
assertEquals(misReplication, stat.misReplicationCount());
}
+ @Test
+ public void testPipelineProviderRackScatter() throws SCMException {
+ setup(3, 1);
+ conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementRackScatter.class.getCanonicalName());
+ List<DatanodeDetails> usedDns = new ArrayList<>();
+ List<DatanodeDetails> excludedDns = new ArrayList<>();
+ List<DatanodeDetails> additionalNodes = policy.chooseDatanodes(usedDns,
+ excludedDns, null, 3, 0, 5);
+ assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+ true, 0);
+ }
+
+ // Test for pipeline provider placement when number of racks less than
+ // number of node required and nodes cannot be scattered. In this case
+ // the placement spreads the nodes as much as possible. In one case
+ // 3 nodes required and 2 racks placing 2 in one 1 in another. When
+ // only 1 rack placing all nodes in same rack.
+ @Test
+ public void testPipelineProviderRackScatterFallback() throws SCMException {
+ setup(3, 2);
+ conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementRackScatter.class.getCanonicalName());
+ List<DatanodeDetails> usedDns = new ArrayList<>();
+ List<DatanodeDetails> excludedDns = new ArrayList<>();
+ List<DatanodeDetails> additionalNodes = policy.chooseDatanodes(usedDns,
+ excludedDns, null, 3, 0, 5);
+ assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+ true, 0);
+
+ setup(3, 3);
+ additionalNodes = policy.chooseDatanodes(usedDns,
+ excludedDns, null, 3, 0, 5);
+ assertPlacementPolicySatisfied(usedDns, additionalNodes, excludedDns, 3,
+ true, 0);
+ }
+
+ // add test for pipeline engagement
+
@Test
public void testValidChooseNodesWithUsedNodes() throws SCMException {
setup(5, 2);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
new file mode 100644
index 0000000000..9d7bae4810
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for PipelinePlacementFactory.
+ */
+public class TestPipelinePlacementFactory {
+ private OzoneConfiguration conf;
+ private NodeManager nodeManager;
+ private NodeManager nodeManagerBase;
+ private PipelineStateManager stateManager;
+ private NetworkTopologyImpl cluster;
+ private final List<DatanodeDetails> datanodes = new ArrayList<>();
+ private final List<DatanodeInfo> dnInfos = new ArrayList<>();
+ private File testDir;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
+
+ private static final long STORAGE_CAPACITY = 100L;
+
+ @BeforeEach
+ public void setup() {
+ //initialize ozone config for tests
+ conf = new OzoneConfiguration();
+ }
+
+ private void setupRacks(int datanodeCount, int nodesPerRack)
+ throws Exception {
+ conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+ 1, StorageUnit.BYTES);
+ NodeSchema[] schemas = new NodeSchema[]
+ {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+ NodeSchemaManager.getInstance().init(schemas, true);
+ cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+
+ // build datanodes, and network topology
+ String rack = "/rack";
+ String hostname = "node";
+ for (int i = 0; i < datanodeCount; i++) {
+ DatanodeDetails datanodeDetails =
+ MockDatanodeDetails.createDatanodeDetails(
+ hostname + i, rack + (i / nodesPerRack));
+
+ datanodes.add(datanodeDetails);
+ cluster.add(datanodeDetails);
+ DatanodeInfo datanodeInfo = new DatanodeInfo(
+ datanodeDetails, NodeStatus.inServiceHealthy(),
+ UpgradeUtils.defaultLayoutVersionProto());
+
+ StorageContainerDatanodeProtocolProtos.StorageReportProto storage1 =
+ HddsTestUtils.createStorageReport(
+ datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto
+ metaStorage1 =
+ HddsTestUtils.createMetadataStorageReport(
+ "/metadata1-" + datanodeInfo.getUuidString(),
+ STORAGE_CAPACITY, 0, 100L, null);
+ datanodeInfo.updateStorageReports(
+ new ArrayList<>(Arrays.asList(storage1)));
+ datanodeInfo.updateMetaDataStorageReports(
+ new ArrayList<>(Arrays.asList(metaStorage1)));
+ dnInfos.add(datanodeInfo);
+ }
+ nodeManagerBase = new MockNodeManager(cluster, datanodes,
+ false, 10);
+ nodeManager = Mockito.spy(nodeManagerBase);
+ for (DatanodeInfo dn: dnInfos) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
+
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ scmhaManager = SCMHAManagerStub.getInstance(true);
+
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+ }
+
+ @Test
+ public void testDefaultPolicy() throws IOException {
+ PlacementPolicy policy = PipelinePlacementPolicyFactory
+ .getPolicy(null, null, conf);
+ Assertions.assertSame(PipelinePlacementPolicy.class, policy.getClass());
+ }
+
+ @Test
+ public void testRackScatterPolicy() throws Exception {
+ conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementRackScatter.class.getCanonicalName());
+ // for this test, rack setup does not matter, just
+ // need a non-null NetworkTopologyMap within the nodeManager
+ setupRacks(6, 3);
+ PlacementPolicy policy = PipelinePlacementPolicyFactory
+ .getPolicy(nodeManager, stateManager, conf);
+ Assertions.assertSame(SCMContainerPlacementRackScatter.class,
+ policy.getClass());
+ }
+
+ // test default rack aware pipeline provider placement - 3 racks
+ // pipeline created with 1 node on one rack and other 2 nodes
+ // on separate rack
+ @Test
+ public void testDefaultPipelineProviderRackPlacement() throws Exception {
+ setupRacks(6, 2);
+ PlacementPolicy policy = PipelinePlacementPolicyFactory
+ .getPolicy(nodeManager, stateManager, conf);
+
+ int nodeNum = 3;
+ List<DatanodeDetails> datanodeDetails =
+ policy.chooseDatanodes(null, null, nodeNum, 15, 15);
+ Assertions.assertEquals(nodeNum, datanodeDetails.size());
+ Assertions.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
+ datanodeDetails.get(2)));
+ Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+ datanodeDetails.get(1)));
+ Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+ datanodeDetails.get(2)));
+ }
+
+ // test rack scatter pipeline provider placement - 3 racks
+ // pipeline created with node on each rack
+ @Test
+ public void testRackScatterPipelineProviderRackPlacement() throws Exception {
+ conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementRackScatter.class.getCanonicalName());
+
+ setupRacks(6, 2);
+ PlacementPolicy policy = PipelinePlacementPolicyFactory
+ .getPolicy(nodeManager, stateManager, conf);
+
+ int nodeNum = 3;
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+ List<DatanodeDetails> favoredNodes = new ArrayList<>();
+ List<DatanodeDetails> datanodeDetails =
+ policy.chooseDatanodes(excludedNodes, excludedNodes, favoredNodes,
+ nodeNum, 15, 15);
+ Assertions.assertEquals(nodeNum, datanodeDetails.size());
+ Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+ datanodeDetails.get(2)));
+ Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
+ datanodeDetails.get(1)));
+ Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
+ datanodeDetails.get(2)));
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 9d5cadeb2d..82a2ab3246 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -692,7 +692,8 @@ public class TestPipelinePlacementPolicy {
createPipelineWithReplicationConfig(standaloneOneDn, STAND_ALONE, ONE);
pipelineCount
- = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(0));
+ = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+ stateManager, healthyNodes.get(0));
Assertions.assertEquals(pipelineCount, 0);
// Check datanode with one RATIS/ONE pipeline
@@ -701,7 +702,8 @@ public class TestPipelinePlacementPolicy {
createPipelineWithReplicationConfig(ratisOneDn, RATIS, ONE);
pipelineCount
- = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+ = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+ stateManager, healthyNodes.get(1));
Assertions.assertEquals(pipelineCount, 0);
// Check datanode with one RATIS/THREE pipeline
@@ -712,7 +714,8 @@ public class TestPipelinePlacementPolicy {
createPipelineWithReplicationConfig(ratisThreeDn, RATIS, THREE);
pipelineCount
- = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(2));
+ = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+ stateManager, healthyNodes.get(2));
Assertions.assertEquals(pipelineCount, 1);
// Check datanode with one RATIS/ONE and one STANDALONE/ONE pipeline
@@ -721,7 +724,8 @@ public class TestPipelinePlacementPolicy {
createPipelineWithReplicationConfig(standaloneOneDn, STAND_ALONE, ONE);
pipelineCount
- = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+ = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+ stateManager, healthyNodes.get(1));
Assertions.assertEquals(pipelineCount, 0);
// Check datanode with one RATIS/ONE and one STANDALONE/ONE pipeline and
@@ -734,7 +738,8 @@ public class TestPipelinePlacementPolicy {
createPipelineWithReplicationConfig(ratisThreeDn, RATIS, THREE);
pipelineCount
- = placementPolicy.currentRatisThreePipelineCount(healthyNodes.get(1));
+ = placementPolicy.currentRatisThreePipelineCount(nodeManager,
+ stateManager, healthyNodes.get(1));
Assertions.assertEquals(pipelineCount, 2);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index b788fd713e..1d3d1ae473 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -58,8 +59,10 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.intersection;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -316,6 +319,23 @@ public class TestRatisPipelineProvider {
}
}
+ @Test
+ // Test pipeline provider with RackScatter policy cannot create
+ // pipeline due to nodes with full pipeline engagement.
+ public void testFactorTHREEPipelineRackScatterEngagement()
+ throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementRackScatter.class.getCanonicalName());
+ conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "0");
+ init(0, conf);
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+
+ Assertions.assertThrows(SCMException.class, () ->
+ provider.create(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ excludedNodes, Collections.EMPTY_LIST));
+ }
@Test
public void testCreatePipelinesWhenNotEnoughSpace() throws Exception {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 137ae08dad..502493eb40 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -132,7 +132,8 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
- OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD
+ OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY
));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]