This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ef4bb7f278d HDDS-14369. RatisPipelineProvider does not honor
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT (#9609)
ef4bb7f278d is described below
commit ef4bb7f278df1374638d313e47a286581b63e0ce
Author: Russole <[email protected]>
AuthorDate: Wed Jan 14 01:30:15 2026 +0800
HDDS-14369. RatisPipelineProvider does not honor
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT (#9609)
---
.../hadoop/hdds/scm/node/SCMNodeManager.java | 11 +++--
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 11 +++--
.../hdds/scm/pipeline/RatisPipelineProvider.java | 16 +++----
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 22 +++++++++
.../scm/pipeline/TestPipelinePlacementPolicy.java | 54 ++++++++++++++++++++++
.../scm/pipeline/TestRatisPipelineProvider.java | 50 ++++++++++++++++++++
6 files changed, 146 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 26d9e49bafc..e2a17dd1d5c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -133,7 +133,7 @@ public class SCMNodeManager implements NodeManager {
private final boolean useHostname;
private final Map<String, Set<DatanodeID>> dnsToDnIdMap = new
ConcurrentHashMap<>();
private final int numPipelinesPerMetadataVolume;
- private final int heavyNodeCriteria;
+ private final int datanodePipelineLimit;
private final HDDSLayoutVersionManager scmLayoutVersionManager;
private final EventPublisher scmNodeEventPublisher;
private final SCMContext scmContext;
@@ -195,8 +195,9 @@ public SCMNodeManager(
this.numPipelinesPerMetadataVolume =
conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
- String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
- this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+ this.datanodePipelineLimit = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.numContainerPerVolume = conf.getInt(
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
@@ -1602,8 +1603,8 @@ public int totalHealthyVolumeCount() {
@Override
public int pipelineLimit(DatanodeDetails dn) {
try {
- if (heavyNodeCriteria > 0) {
- return heavyNodeCriteria;
+ if (datanodePipelineLimit > 0) {
+ return datanodePipelineLimit;
} else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) {
return numPipelinesPerMetadataVolume *
nodeStateManager.getNode(dn).getMetaDataVolumeCount();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 696d6ecc336..366a33dccf2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -54,7 +54,7 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
- private final int heavyNodeCriteria;
+ private final int datanodePipelineLimit;
private static final int REQUIRED_RACKS = 2;
public static final String MULTIPLE_RACK_PIPELINE_MSG =
@@ -76,8 +76,9 @@ public PipelinePlacementPolicy(final NodeManager nodeManager,
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.stateManager = stateManager;
- String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
- this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+ this.datanodePipelineLimit = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}
public static int currentRatisThreePipelineCount(
@@ -182,7 +183,7 @@ List<DatanodeDetails> filterViableNodes(
if (healthyList.size() < nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to find enough nodes that meet the criteria that" +
- " cannot engage in more than" + heavyNodeCriteria +
+ " cannot engage in more than" + datanodePipelineLimit +
" pipelines. Nodes required: " + nodesRequired + " Excluded: " +
excludedNodesSize + " Found:" +
healthyList.size() + " healthy nodes count in NodeManager: " +
@@ -191,7 +192,7 @@ List<DatanodeDetails> filterViableNodes(
msg = String.format("Pipeline creation failed because nodes are engaged"
+
" in other pipelines and every node can only be engaged in" +
" max %d pipelines. Required %d. Found %d. Excluded: %d.",
- heavyNodeCriteria, nodesRequired, healthyList.size(),
+ datanodePipelineLimit, nodesRequired, healthyList.size(),
excludedNodesSize);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index b35a1f28148..800a7ebfd83 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -60,7 +60,7 @@ public class RatisPipelineProvider
private final EventPublisher eventPublisher;
private final PlacementPolicy placementPolicy;
private final int pipelineNumberLimit;
- private final int maxPipelinePerDatanode;
+ private final int datanodePipelineLimit;
private final LeaderChoosePolicy leaderChoosePolicy;
private final SCMContext scmContext;
private final long containerSizeBytes;
@@ -80,9 +80,9 @@ public RatisPipelineProvider(NodeManager nodeManager,
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
- String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
- this.maxPipelinePerDatanode = dnLimit == null ? 0 :
- Integer.parseInt(dnLimit);
+ this.datanodePipelineLimit = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.containerSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
@@ -110,10 +110,10 @@ private boolean
exceedPipelineNumberLimit(RatisReplicationConfig replicationConf
int closedPipelines = pipelineStateManager.getPipelines(replicationConfig,
PipelineState.CLOSED).size();
int openPipelines = totalActivePipelines - closedPipelines;
// Check per-datanode pipeline limit
- if (maxPipelinePerDatanode > 0) {
+ if (datanodePipelineLimit > 0) {
int healthyNodeCount = getNodeManager()
.getNodeCount(NodeStatus.inServiceHealthy());
- int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount)
+ int allowedOpenPipelines = (datanodePipelineLimit * healthyNodeCount)
/ replicationConfig.getRequiredNodes();
return openPipelines >= allowedOpenPipelines;
}
@@ -145,8 +145,8 @@ public synchronized Pipeline create(RatisReplicationConfig
replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException {
if (exceedPipelineNumberLimit(replicationConfig)) {
- String limitInfo = (maxPipelinePerDatanode > 0)
- ? String.format("per datanode: %d", maxPipelinePerDatanode)
+ String limitInfo = (datanodePipelineLimit > 0)
+ ? String.format("per datanode: %d", datanodePipelineLimit)
: String.format(": %d", pipelineNumberLimit);
throw new SCMException(
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 84b548a4c04..928e38295f5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -2059,6 +2059,28 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
}
}
+ /**
+ * Test that pipelineLimit() uses the default value when the config is not
set.
+ */
+ @Test
+ public void testUsesDefaultPipelineLimitWhenUnset()
+ throws IOException, AuthenticationException {
+
+ // Creates node manager with config without limit set
+ OzoneConfiguration conf = getConf();
+ conf.unset(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+
+ // Registers datanode with healthy volumes
+ DatanodeDetails dn = registerWithCapacity(nodeManager);
+
+ // Calls pipelineLimit() and verifies returns default value
+ int limit = nodeManager.pipelineLimit(dn);
+ assertEquals(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT, limit);
+ }
+ }
+
private static Stream<Arguments> nodeStateTransitions() {
return Stream.of(
// start decommissioning or entering maintenance
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b9b5ac88455..7094a39c79e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -722,6 +722,60 @@ public void testCurrentRatisThreePipelineCount()
assertEquals(pipelineCount, 2);
}
+ @Test
+ public void testPipelinePlacementPolicyDefaultLimitFiltersNodeAtLimit()
+ throws IOException, TimeoutException {
+
+ // 1) Creates policy with config without limit set
+ OzoneConfiguration localConf = new OzoneConfiguration(conf);
+ localConf.unset(OZONE_DATANODE_PIPELINE_LIMIT);
+
+ MockNodeManager localNodeManager = new MockNodeManager(cluster,
+ getNodesWithRackAwareness(), false,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+
+ // Ensure NodeManager uses default limit (=2) when limit is not set in conf
+ localNodeManager.setNumPipelinePerDatanode(
+ ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+
+ PipelineStateManager localStateManager =
PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(localNodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
+ PipelinePlacementPolicy localPolicy = new PipelinePlacementPolicy(
+ localNodeManager, localStateManager, localConf);
+
+ List<DatanodeDetails> healthy =
+ localNodeManager.getNodes(NodeStatus.inServiceHealthy());
+ DatanodeDetails target = healthy.get(0);
+
+ // 2) Adds exactly 2 pipelines to test node (default limit)
+ List<DatanodeDetails> p1Dns = new ArrayList<>();
+ p1Dns.add(target);
+ p1Dns.add(healthy.get(1));
+ p1Dns.add(healthy.get(2));
+ createPipelineWithReplicationConfig(p1Dns, RATIS, THREE);
+
+ List<DatanodeDetails> p2Dns = new ArrayList<>();
+ p2Dns.add(target);
+ p2Dns.add(healthy.get(3));
+ p2Dns.add(healthy.get(4));
+ createPipelineWithReplicationConfig(p2Dns, RATIS, THREE);
+
+ assertEquals(2, PipelinePlacementPolicy.currentRatisThreePipelineCount(
+ localNodeManager, localStateManager, target));
+
+ // 3) Verifies node is filtered out when choosing nodes for new pipeline
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
+ List<DatanodeDetails> chosen = localPolicy.chooseDatanodes(
+ new ArrayList<>(), new ArrayList<>(), nodesRequired, 0, 0);
+
+ assertEquals(nodesRequired, chosen.size());
+ assertThat(chosen).doesNotContain(target);
+ }
+
private void createPipelineWithReplicationConfig(List<DatanodeDetails>
dnList,
HddsProtos.ReplicationType
replicationType,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index cfeba61c320..ca9d1f5a6c3 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -365,6 +365,56 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir
File tempDir) throws
}
}
+ @Test
+ public void testCreatePipelineWithDefaultLimit() throws Exception {
+ // Create conf without setting OZONE_DATANODE_PIPELINE_LIMIT
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+
+ dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get());
+
+ // MockNodeManager(true, 10) typically gives 8 healthy nodes in this test
suite.
+ nodeManager = new MockNodeManager(true, nodeCount);
+ // Give a large quota in MockNodeManager so we don't fail early due to
mock quota.
+ nodeManager.setNumPipelinePerDatanode(100);
+
+ SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
+ stateManager = PipelineStateManagerImpl.newBuilder()
+ .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
+ provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf);
+
+ int healthyCount =
nodeManager.getNodes(NodeStatus.inServiceHealthy()).size();
+ int defaultLimit = ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
+ assertEquals(2, defaultLimit);
+
+ // Max pipelines before exceeding per-DN default limit.
+ int maxPipelines = (healthyCount * defaultLimit)
+ / ReplicationFactor.THREE.getNumber();
+
+ // Create pipelines up to maxPipelines.
+ for (int i = 0; i < maxPipelines; i++) {
+ Pipeline p = provider.create(
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ new ArrayList<>(), new ArrayList<>());
+
stateManager.addPipeline(p.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+ }
+
+ // Next pipeline creation should fail with default limit message.
+ SCMException ex = assertThrows(SCMException.class, () ->
+
provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ new ArrayList<>(), new ArrayList<>())
+ );
+
+ assertThat(ex.getMessage())
+ .contains("limit per datanode: " + defaultLimit)
+ .contains("replicationConfig: RATIS/THREE");
+ }
+
@ParameterizedTest
@CsvSource({ "1, 3", "2, 6"})
public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int
pipelineCount) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]