This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f5d2a0f548a HDDS-13590. Refactor HealthyPipelineSafeModeRule to not
use PipelineReportFromDatanode (#9651)
f5d2a0f548a is described below
commit f5d2a0f548a39ddccbd606c3039f88d8b02e8565
Author: Priyesh Karatha <[email protected]>
AuthorDate: Tue Mar 3 22:38:49 2026 +0530
HDDS-13590. Refactor HealthyPipelineSafeModeRule to not use
PipelineReportFromDatanode (#9651)
---
.../scm/safemode/HealthyPipelineSafeModeRule.java | 82 ++++++++++++++-
.../hadoop/hdds/scm/safemode/SafeModeMetrics.java | 8 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 113 +++++++++++++++++++++
3 files changed, 199 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 94964df73a9..3e590013c11 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -123,13 +123,18 @@ protected synchronized boolean validate() {
LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
"finalization. Bypassing healthy pipeline safemode rule.");
return true;
- } else {
- return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
}
+ if (!validateBasedOnReportProcessing()) {
+ return validateHealthyPipelineSafeModeRuleUsingPipelineManager();
+ }
+ return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
}
@Override
protected synchronized void process(Pipeline pipeline) {
+ if (!validateBasedOnReportProcessing()) {
+ return;
+ }
Objects.requireNonNull(pipeline, "pipeline == null");
// When SCM is in safe mode for long time, already registered
@@ -237,6 +242,61 @@ private synchronized void initializeRule(boolean refresh) {
healthyPipelineThresholdCount);
}
+ private boolean validateHealthyPipelineSafeModeRuleUsingPipelineManager() {
+ // Query PipelineManager directly for healthy pipeline count
+ List<Pipeline> openPipelines = pipelineManager.getPipelines(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+ Pipeline.PipelineState.OPEN);
+
+ LOG.debug("Found {} open RATIS/THREE pipelines", openPipelines.size());
+
+ int pipelineCount = openPipelines.size();
+ healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+ (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
+
+ currentHealthyPipelineCount = (int) openPipelines.stream()
+ .filter(this::isPipelineHealthy)
+ .count();
+
+
getSafeModeMetrics().setNumCurrentHealthyPipelines(currentHealthyPipelineCount);
+ boolean isValid = currentHealthyPipelineCount >=
healthyPipelineThresholdCount;
+ if (scmInSafeMode()) {
+ LOG.info("SCM in safe mode. Healthy pipelines: {}, threshold: {}, rule
satisfied: {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount, isValid);
+ } else {
+ LOG.debug("SCM not in safe mode. Healthy pipelines: {}, threshold: {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount);
+ }
+ return isValid;
+ }
+
+ boolean isPipelineHealthy(Pipeline pipeline) {
+ // Verify pipeline has all 3 nodes
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ if (nodes.size() != 3) {
+ LOG.debug("Pipeline {} is not healthy: has {} nodes instead of 3",
+ pipeline.getId(), nodes.size());
+ return false;
+ }
+
+ // Verify all nodes are healthy
+ for (DatanodeDetails dn : nodes) {
+ try {
+ NodeStatus status = nodeManager.getNodeStatus(dn);
+ if (!status.equals(NodeStatus.inServiceHealthy())) {
+ LOG.debug("Pipeline {} is not healthy: DN {} has status - Health:
{}, Operational State: {}",
+ pipeline.getId(), dn.getUuidString(), status.getHealth(),
status.getOperationalState());
+ return false;
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Pipeline {} is not healthy: DN {} not found in node manager",
+ pipeline.getId(), dn.getUuidString());
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
protected synchronized void cleanup() {
processedPipelineIDs.clear();
@@ -265,6 +325,24 @@ public String getStatusText() {
private synchronized String updateStatusTextWithSamplePipelines(
String status) {
+ if (validateBasedOnReportProcessing()) {
+ List<Pipeline> openPipelines = pipelineManager.getPipelines(
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+ Pipeline.PipelineState.OPEN);
+
+ Set<PipelineID> unhealthyPipelines = openPipelines.stream()
+ .filter(p -> !isPipelineHealthy(p))
+ .map(Pipeline::getId)
+ .limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
+ .collect(Collectors.toSet());
+
+ if (!unhealthyPipelines.isEmpty()) {
+ String samplePipelineText =
+ "Sample pipelines not satisfying the criteria : " +
unhealthyPipelines;
+ status = status.concat("\n").concat(samplePipelineText);
+ }
+ return status;
+ }
Set<PipelineID> samplePipelines =
unProcessedPipelineSet.stream().limit(SAMPLE_PIPELINE_DISPLAY_LIMIT)
.collect(Collectors.toSet());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
index ae65eafcb91..1f1daaae09b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeMetrics.java
@@ -47,7 +47,7 @@ public class SafeModeMetrics {
// Pipeline metrics for safemode
private @Metric MutableGaugeLong numHealthyPipelinesThreshold;
- private @Metric MutableCounterLong currentHealthyPipelinesCount;
+ private @Metric MutableGaugeLong currentHealthyPipelinesCount;
private @Metric MutableGaugeLong
numPipelinesWithAtleastOneReplicaReportedThreshold;
private @Metric MutableCounterLong
@@ -72,6 +72,10 @@ public void incCurrentHealthyPipelinesCount() {
this.currentHealthyPipelinesCount.incr();
}
+ public void setNumCurrentHealthyPipelines(long val) {
+ this.currentHealthyPipelinesCount.set(val);
+ }
+
public void setNumPipelinesWithAtleastOneReplicaReportedThreshold(long val) {
this.numPipelinesWithAtleastOneReplicaReportedThreshold.set(val);
}
@@ -117,7 +121,7 @@ MutableGaugeLong getNumHealthyPipelinesThreshold() {
return numHealthyPipelinesThreshold;
}
- MutableCounterLong getCurrentHealthyPipelinesCount() {
+ MutableGaugeLong getCurrentHealthyPipelinesCount() {
return currentHealthyPipelinesCount;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index ec97e4df6a9..b1312512d56 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.safemode;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -295,6 +296,118 @@ public void
testHealthyPipelineSafeModeRuleWithMixedPipelines()
}
+ @Test
+ public void
testHealthyPipelineThresholdIncreasesWithMorePipelinesAndReports()
+ throws Exception {
+ EventQueue eventQueue = new EventQueue();
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ SCMContext scmContext = SCMContext.emptyContext();
+ List<ContainerInfo> containers =
+ new ArrayList<>(HddsTestUtils.getContainerInfo(1));
+
+ OzoneConfiguration config = new OzoneConfiguration();
+ MockNodeManager nodeManager = new MockNodeManager(true, 12);
+ ContainerManager containerManager = mock(ContainerManager.class);
+ when(containerManager.getContainers()).thenReturn(containers);
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFile.getPath());
+ config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
false);
+
config.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
+ 0.5);
+ config.setInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE, 0);
+
+ SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
+ try {
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
+ config,
+ SCMHAManagerStub.getInstance(true),
+ nodeManager,
+ scmMetadataStore.getPipelineTable(),
+ eventQueue,
+ scmContext,
+ serviceManager,
+ Clock.system(ZoneOffset.UTC));
+
+ PipelineProvider mockRatisProvider =
+ new MockRatisPipelineProvider(nodeManager,
+ pipelineManager.getStateManager(), config);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ mockRatisProvider);
+
+ // Create all pipelines before SCM enters safe mode. Pipeline creation is
+ // blocked once safe mode prechecks have not passed.
+ Pipeline pipeline1 =
+ pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE));
+ Pipeline pipeline2 =
+ pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE));
+ Pipeline pipeline3 =
+ pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE));
+
+ // Start with one healthy open pipeline. Threshold is small at this
point.
+ pipelineManager.openPipeline(pipeline1.getId());
+ pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
+
+ SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(config,
+ nodeManager, pipelineManager, containerManager, serviceManager,
+ eventQueue, scmContext);
+ scmSafeModeManager.start();
+
+ HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
+ SafeModeRuleFactory.getInstance()
+ .getSafeModeRule(HealthyPipelineSafeModeRule.class);
+ healthyPipelineSafeModeRule.setValidateBasedOnReportProcessing(false);
+
+ firePipelineEvent(pipeline1, eventQueue);
+ assertTrue(healthyPipelineSafeModeRule.validate());
+ assertEquals(1,
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+
+ // Open more pipelines so threshold increases.
+ pipelineManager.openPipeline(pipeline2.getId());
+ pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
+
+ pipelineManager.openPipeline(pipeline3.getId());
+ pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
+
+ // Simulate DN reports causing pipelines to be considered unhealthy.
+ for (DatanodeDetails dn : pipeline2.getNodes()) {
+ nodeManager.setNodeState(dn, HddsProtos.NodeState.DEAD);
+ }
+ for (DatanodeDetails dn : pipeline3.getNodes()) {
+ nodeManager.setNodeState(dn, HddsProtos.NodeState.DEAD);
+ }
+ firePipelineEvent(pipeline2, eventQueue);
+ firePipelineEvent(pipeline3, eventQueue);
+
+ assertFalse(healthyPipelineSafeModeRule.validate());
+ assertEquals(2,
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+
+ // Simulate more DN reports and recovery to healthy state, then exit
rule.
+ for (DatanodeDetails dn : pipeline1.getNodes()) {
+ nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+ }
+ for (DatanodeDetails dn : pipeline2.getNodes()) {
+ nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+ }
+ for (DatanodeDetails dn : pipeline3.getNodes()) {
+ nodeManager.setNodeState(dn, HddsProtos.NodeState.HEALTHY);
+ }
+ firePipelineEvent(pipeline1, eventQueue);
+ firePipelineEvent(pipeline2, eventQueue);
+ firePipelineEvent(pipeline3, eventQueue);
+
+ assertTrue(healthyPipelineSafeModeRule.validate());
+ assertEquals(2,
healthyPipelineSafeModeRule.getHealthyPipelineThresholdCount());
+ } finally {
+ scmMetadataStore.getStore().close();
+ }
+ }
+
@Test
public void testPipelineIgnoredWhenDnIsUnhealthy() throws Exception {
EventQueue eventQueue = new EventQueue();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]