This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-14496-zdu
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-14496-zdu by this push:
     new bb546a28374 HDDS-14646. SCM should not close Ratis pipelines on 
Finalize (#9779)
bb546a28374 is described below

commit bb546a28374c1b5e06feebb9300b8c734b79bb4e
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Feb 24 21:39:39 2026 +0000

    HDDS-14646. SCM should not close Ratis pipelines on Finalize (#9779)
---
 .../upgrade/DataNodeUpgradeFinalizer.java          |  41 ------
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   4 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   6 -
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  12 --
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  39 +-----
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  17 +--
 .../hdds/scm/safemode/SafeModeRuleFactory.java     |   2 +-
 .../scm/server/upgrade/FinalizationManager.java    |   7 -
 .../server/upgrade/FinalizationManagerImpl.java    |   5 -
 .../upgrade/FinalizationStateManagerImpl.java      |  22 ---
 .../scm/server/upgrade/SCMUpgradeFinalizer.java    | 152 ++++++++++-----------
 .../hdds/scm/pipeline/MockPipelineManager.java     |  15 --
 .../hdds/scm/upgrade/TestScmFinalization.java      |  30 ----
 .../hadoop/hdds/upgrade/TestHddsUpgradeUtils.java  |  52 +------
 .../hadoop/hdds/upgrade/TestScmHAFinalization.java |  25 +---
 15 files changed, 85 insertions(+), 344 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index 5cb06e88e6c..769d556643c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -17,15 +17,8 @@
 
 package org.apache.hadoop.ozone.container.upgrade;
 
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_VALIDATION_FAILED;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_IN_PROGRESS;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_REQUIRED;
-
-import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature;
@@ -41,40 +34,6 @@ public DataNodeUpgradeFinalizer(HDDSLayoutVersionManager 
versionManager) {
     super(versionManager);
   }
 
-  @Override
-  public void preFinalizeUpgrade(DatanodeStateMachine dsm)
-      throws IOException {
-    if (!canFinalizeDataNode(dsm)) {
-      // DataNode is not yet ready to finalize.
-      // Reset the Finalization state.
-      getVersionManager().setUpgradeState(FINALIZATION_REQUIRED);
-      String msg = "Pre Finalization checks failed on the DataNode.";
-      logAndEmit(msg);
-      throw new UpgradeException(msg, PREFINALIZE_VALIDATION_FAILED);
-    }
-    getVersionManager().setUpgradeState(FINALIZATION_IN_PROGRESS);
-  }
-
-  private boolean canFinalizeDataNode(DatanodeStateMachine dsm) {
-    // Lets be sure that we do not have any open container before we return
-    // from here. This function should be called in its own finalizer thread
-    // context.
-    for (Container<?> ctr :
-        dsm.getContainer().getController().getContainers()) {
-      ContainerProtos.ContainerDataProto.State state = ctr.getContainerState();
-      long id = ctr.getContainerData().getContainerID();
-      switch (state) {
-      case OPEN:
-      case CLOSING:
-        LOG.warn("FinalizeUpgrade : Waiting for container {} to close, current 
"
-            + "state is: {}", id, state);
-        return false;
-      default:
-      }
-    }
-    return true;
-  }
-
   @Override
   public void finalizeLayoutFeature(LayoutFeature layoutFeature,
       DatanodeStateMachine dsm) throws UpgradeException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index e9a019945c1..e1cf9a45bf5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -408,9 +408,7 @@ default Collection<DatanodeDetails> 
getPeerList(DatanodeDetails dn) {
   default HDDSLayoutVersionManager getLayoutVersionManager() {
     return null;
   }
-
-  default void forceNodesToHealthyReadOnly() { }
-
+  
   /**
    * This API allows removal of only DECOMMISSIONED, IN_MAINTENANCE and DEAD 
nodes
    * from NodeManager data structures and cleanup memory.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 3289e7b312a..ff561411f34 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -1958,12 +1958,6 @@ public HDDSLayoutVersionManager 
getLayoutVersionManager() {
     return scmLayoutVersionManager;
   }
 
-  @VisibleForTesting
-  @Override
-  public void forceNodesToHealthyReadOnly() {
-    nodeStateManager.forceNodesToHealthyReadOnly();
-  }
-
   private ReentrantReadWriteLock.WriteLock writeLock() {
     return lock.writeLock();
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a448d6c88d..4af0d60634a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -180,18 +180,6 @@ default Pipeline 
waitOnePipelineReady(Collection<PipelineID> pipelineIDs,
   void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
       throws RocksDatabaseException, DuplicatedPipelineIdException, 
CodecException;
 
-  /**
-   * Ask pipeline manager to not create any new pipelines.
-   */
-  void freezePipelineCreation();
-
-  /**
-   * Ask pipeline manager to resume creating new pipelines.
-   */
-  void resumePipelineCreation();
-
-  boolean isPipelineCreationFrozen();
-
   /**
    * Acquire read lock.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 9c529e22e7e..bf33282aaf4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -31,7 +31,6 @@
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import javax.management.ObjectName;
@@ -56,7 +55,6 @@
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.db.CodecException;
 import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
@@ -93,9 +91,6 @@ public class PipelineManagerImpl implements PipelineManager {
   private final SCMHAManager scmhaManager;
   private SCMContext scmContext;
   private final NodeManager nodeManager;
-  // This allows for freezing/resuming the new pipeline creation while the
-  // SCM is already out of SafeMode.
-  private AtomicBoolean freezePipelineCreation;
   private final Clock clock;
 
   @SuppressWarnings("checkstyle:parameterNumber")
@@ -123,7 +118,6 @@ protected PipelineManagerImpl(ConfigurationSource conf,
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
-    this.freezePipelineCreation = new AtomicBoolean();
   }
 
   @SuppressWarnings("checkstyle:parameterNumber")
@@ -160,13 +154,7 @@ public static PipelineManagerImpl newPipelineManager(
 
     pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
     serviceManager.register(backgroundPipelineCreator);
-
-    if (FinalizationManager.shouldCreateNewPipelines(
-        scmContext.getFinalizationCheckpoint())) {
-      pipelineManager.resumePipelineCreation();
-    } else {
-      pipelineManager.freezePipelineCreation();
-    }
+    backgroundPipelineCreator.start();
 
     final long scrubberIntervalInMillis = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
@@ -276,13 +264,6 @@ private void checkIfPipelineCreationIsAllowed(
       throw new IOException("Pipeline creation is not allowed as safe mode " +
           "prechecks have not yet passed");
     }
-
-    if (freezePipelineCreation.get()) {
-      String message = "Cannot create new pipelines while pipeline creation " +
-          "is frozen.";
-      LOG.info(message);
-      throw new IOException(message);
-    }
   }
 
   private void addPipelineToManager(Pipeline pipeline)
@@ -803,24 +784,6 @@ public void reinitialize(Table<PipelineID, Pipeline> 
pipelineStore)
     stateManager.reinitialize(pipelineStore);
   }
 
-  @Override
-  public void freezePipelineCreation() {
-    freezePipelineCreation.set(true);
-    backgroundPipelineCreator.stop();
-  }
-
-  @Override
-  public void resumePipelineCreation() {
-    freezePipelineCreation.set(false);
-    backgroundPipelineCreator.start();
-  }
-
-  @Override
-  public boolean isPipelineCreationFrozen() {
-    return freezePipelineCreation.get() &&
-        !backgroundPipelineCreator.isRunning();
-  }
-
   @Override
   public void close() throws IOException {
     if (backgroundPipelineCreator != null) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 94964df73a9..3ad70fefa04 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -32,14 +32,12 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -63,16 +61,14 @@ public class HealthyPipelineSafeModeRule extends 
SafeModeExitRule<Pipeline> {
   private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
   private final PipelineManager pipelineManager;
   private final int minHealthyPipelines;
-  private final SCMContext scmContext;
   private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>();
   private final NodeManager nodeManager;
 
   HealthyPipelineSafeModeRule(EventQueue eventQueue,
       PipelineManager pipelineManager, SCMSafeModeManager manager,
-      ConfigurationSource configuration, SCMContext scmContext, NodeManager 
nodeManager) {
+      ConfigurationSource configuration, NodeManager nodeManager) {
     super(manager, eventQueue);
     this.pipelineManager = pipelineManager;
-    this.scmContext = scmContext;
     this.nodeManager = nodeManager;
     healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
@@ -116,16 +112,7 @@ protected TypedEvent<Pipeline> getEventType() {
 
   @Override
   protected synchronized boolean validate() {
-    boolean shouldRunSafemodeCheck =
-        FinalizationManager.shouldCreateNewPipelines(
-            scmContext.getFinalizationCheckpoint());
-    if (!shouldRunSafemodeCheck) {
-      LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
-          "finalization. Bypassing healthy pipeline safemode rule.");
-      return true;
-    } else {
-      return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
-    }
+    return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
index 398eb19b56e..52dfbd17e9e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeRuleFactory.java
@@ -94,7 +94,7 @@ private void loadRules(SCMSafeModeManager safeModeManager) {
 
     if (pipelineManager != null) {
       safeModeRules.add(new HealthyPipelineSafeModeRule(eventQueue, 
pipelineManager,
-          safeModeManager, config, scmContext, nodeManager));
+          safeModeManager, config, nodeManager));
       safeModeRules.add(new OneReplicaPipelineSafeModeRule(eventQueue, 
pipelineManager,
           safeModeManager, config));
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
index 114063c5bce..421837971bb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
@@ -43,8 +43,6 @@ UpgradeFinalization.StatusAndMessages 
queryUpgradeFinalizationProgress(
   BasicUpgradeFinalizer<SCMUpgradeFinalizationContext, 
HDDSLayoutVersionManager>
       getUpgradeFinalizer();
 
-  boolean crossedCheckpoint(FinalizationCheckpoint checkpoint);
-
   FinalizationCheckpoint getCheckpoint();
 
   void buildUpgradeContext(NodeManager nodeManager,
@@ -55,11 +53,6 @@ void buildUpgradeContext(NodeManager nodeManager,
 
   void onLeaderReady();
 
-  static boolean shouldCreateNewPipelines(FinalizationCheckpoint checkpoint) {
-    return !checkpoint.hasCrossed(FinalizationCheckpoint.FINALIZATION_STARTED)
-        || checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
-  }
-
   static boolean shouldTellDatanodesToFinalize(
       FinalizationCheckpoint checkpoint) {
     return checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
index 5155aea425b..be65b933a1b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
@@ -132,11 +132,6 @@ HDDSLayoutVersionManager> getUpgradeFinalizer() {
     return upgradeFinalizer;
   }
 
-  @Override
-  public boolean crossedCheckpoint(FinalizationCheckpoint checkpoint) {
-    return finalizationStateManager.crossedCheckpoint(checkpoint);
-  }
-
   @Override
   public FinalizationCheckpoint getCheckpoint() {
     return finalizationStateManager.getFinalizationCheckpoint();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
index 77bc3499d9a..4dde4089f0a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
@@ -25,7 +25,6 @@
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -72,27 +71,6 @@ private void publishCheckpoint(FinalizationCheckpoint 
checkpoint) {
     // Move the upgrade status according to this checkpoint. This is sent
     // back to the client if they query for the current upgrade status.
     versionManager.setUpgradeState(checkpoint.getStatus());
-
-    // Check whether this checkpoint change requires us to move node state.
-    // If this is necessary, it must be done before unfreezing pipeline
-    // creation to make sure nodes are not added to pipelines based on
-    // outdated layout information.
-    // This operation is not idempotent.
-    if (checkpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
-      upgradeContext.getNodeManager().forceNodesToHealthyReadOnly();
-    }
-
-    // Check whether this checkpoint change requires us to freeze pipeline
-    // creation. These are idempotent operations.
-    PipelineManager pipelineManager = upgradeContext.getPipelineManager();
-    if (FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
-        pipelineManager.isPipelineCreationFrozen()) {
-      pipelineManager.resumePipelineCreation();
-    } else if (!FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
-          !pipelineManager.isPipelineCreationFrozen()) {
-      pipelineManager.freezePipelineCreation();
-    }
-
     // Set the checkpoint in the SCM context so other components can read it.
     upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index 6fb479592ad..553d1d09b19 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -17,14 +17,13 @@
 
 package org.apache.hadoop.hdds.scm.server.upgrade;
 
-import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-
 import java.io.IOException;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
@@ -67,16 +66,11 @@ public void 
preFinalizeUpgrade(SCMUpgradeFinalizationContext context)
       context.getFinalizationStateManager().addFinalizingMark();
     }
     logCheckpointCrossed(FinalizationCheckpoint.FINALIZATION_STARTED);
-
-    if (!stateManager.crossedCheckpoint(
-        FinalizationCheckpoint.MLV_EQUALS_SLV)) {
-      closePipelinesBeforeFinalization(context.getPipelineManager());
-    }
   }
 
   @Override
   public void finalizeLayoutFeature(LayoutFeature lf,
-       SCMUpgradeFinalizationContext context) throws UpgradeException {
+      SCMUpgradeFinalizationContext context) throws UpgradeException {
     // Run upgrade actions, update VERSION file, and update layout version in
     // DB.
     try {
@@ -115,89 +109,89 @@ public void 
postFinalizeUpgrade(SCMUpgradeFinalizationContext context)
         context.getFinalizationStateManager();
     if (!stateManager.crossedCheckpoint(
         FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
-      createPipelinesAfterFinalization(context);
+      waitForDatanodesToFinalize(context);
       stateManager.removeFinalizingMark();
     }
   }
 
-  private void closePipelinesBeforeFinalization(PipelineManager 
pipelineManager)
-      throws IOException {
-    /*
-     * Before we can call finalize the feature, we need to make sure that
-     * all existing pipelines are closed and pipeline Manger would freeze
-     * all new pipeline creation.
-     */
-    String msg = "  Existing pipelines and containers will be closed " +
-        "during Upgrade.";
-    msg += "\n  New pipelines creation will remain frozen until Upgrade " +
-        "is finalized.";
-
-    // Pipeline creation should already be frozen when the finalization state
-    // manager set the checkpoint.
-    if (!pipelineManager.isPipelineCreationFrozen()) {
-      throw new SCMException("Error during finalization. Pipeline creation" +
-          "should have been frozen before closing existing pipelines.",
-          SCMException.ResultCodes.INTERNAL_ERROR);
-    }
-
-    for (Pipeline pipeline : pipelineManager.getPipelines()) {
-      if (pipeline.getPipelineState() != CLOSED) {
-        pipelineManager.closePipeline(pipeline.getId());
-      }
-    }
-
-    // We can not yet move all the existing data nodes to HEALTHY-READONLY
-    // state since the next heartbeat will move them back to HEALTHY state.
-    // This has to wait till postFinalizeUpgrade, when SCM MLV version is
-    // already upgraded as part of finalize processing.
-    // While in this state, it should be safe to do finalize processing for
-    // all new features. This will also update ondisk mlv version. Any
-    // disrupting upgrade can add a hook here to make sure that SCM is in a
-    // consistent state while finalizing the upgrade.
-
-    logAndEmit(msg);
-  }
+  /**
+   * Wait for all HEALTHY datanodes to complete finalization before finishing
+   * SCM finalization. This ensures that when the client receives a
+   * FINALIZATION_DONE status, all healthy datanodes have also finalized.
+   *
+   * A datanode is considered finalized when its metadata layout version (MLV)
+   * equals its software layout version (SLV), indicating it has completed
+   * processing all layout features.
+   *
+   * @param context The finalization context containing node manager reference
+   * @throws SCMException if waiting is interrupted or SCM loses leadership
+   * @throws NotLeaderException if SCM is no longer the leader
+   */
+  private void waitForDatanodesToFinalize(SCMUpgradeFinalizationContext 
context)
+      throws SCMException, NotLeaderException {
+    NodeManager nodeManager = context.getNodeManager();
 
-  private void createPipelinesAfterFinalization(
-      SCMUpgradeFinalizationContext context) throws SCMException,
-      NotLeaderException {
-    // Pipeline creation should already be resumed when the finalization state
-    // manager set the checkpoint.
-    PipelineManager pipelineManager = context.getPipelineManager();
-    if (pipelineManager.isPipelineCreationFrozen()) {
-      throw new SCMException("Error during finalization. Pipeline creation " +
-          "should have been resumed before waiting for new pipelines.",
-          SCMException.ResultCodes.INTERNAL_ERROR);
-    }
+    LOG.info("Waiting for all HEALTHY datanodes to complete finalization 
before finishing SCM finalization.");
 
-    // Wait for at least one pipeline to be created before finishing
-    // finalization, so clients can write.
-    boolean hasPipeline = false;
-    while (!hasPipeline) {
+    boolean allDatanodesFinalized = false;
+    while (!allDatanodesFinalized) {
       // Break out of the wait and step down from driving finalization if this
       // SCM is no longer the leader by throwing NotLeaderException.
       context.getSCMContext().getTermOfLeader();
 
-      ReplicationConfig ratisThree =
-          ReplicationConfig.fromProtoTypeAndFactor(
-              HddsProtos.ReplicationType.RATIS,
-              HddsProtos.ReplicationFactor.THREE);
-      int pipelineCount =
-          pipelineManager.getPipelines(ratisThree, Pipeline.PipelineState.OPEN)
-              .size();
-
-      hasPipeline = (pipelineCount >= 1);
-      if (!hasPipeline) {
-        LOG.info("Waiting for at least one open Ratis 3 pipeline after SCM " +
-            "finalization.");
+      allDatanodesFinalized = true;
+      int totalHealthyNodes = 0;
+      int finalizedNodes = 0;
+      int unfinalizedNodes = 0;
+
+      for (DatanodeDetails dn : nodeManager.getAllNodes()) {
+        try {
+          // Only check HEALTHY nodes. STALE/DEAD nodes will be told to
+          // finalize when they recover.
+          if (nodeManager.getNodeStatus(dn).isHealthy()) {
+            totalHealthyNodes++;
+            DatanodeInfo datanodeInfo = nodeManager.getDatanodeInfo(dn);
+            if (datanodeInfo == null) {
+              LOG.warn("Could not get DatanodeInfo for {}, skipping in " +
+                  "finalization wait.", dn.getHostName());
+              continue;
+            }
+
+            LayoutVersionProto dnLayout = 
datanodeInfo.getLastKnownLayoutVersion();
+            int dnMlv = dnLayout.getMetadataLayoutVersion();
+            int dnSlv = dnLayout.getSoftwareLayoutVersion();
+
+            if (dnMlv < dnSlv) {
+              // Datanode has not yet finalized
+              allDatanodesFinalized = false;
+              unfinalizedNodes++;
+              LOG.debug("Datanode {} has not yet finalized: MLV={}, SLV={}",
+                  dn.getHostName(), dnMlv, dnSlv);
+            } else {
+              finalizedNodes++;
+            }
+          }
+        } catch (NodeNotFoundException e) {
+          // Node was removed while we were iterating. This is OK, skip it.
+          LOG.debug("Node {} not found while waiting for finalization, " +
+              "skipping.", dn);
+        }
+      }
+
+      if (!allDatanodesFinalized) {
+        LOG.info("Waiting for datanodes to finalize. Status: {}/{} healthy " +
+                "datanodes have finalized ({} remaining).",
+            finalizedNodes, totalHealthyNodes, unfinalizedNodes);
         try {
           Thread.sleep(5000);
         } catch (InterruptedException e) {
-          // Try again on next loop iteration.
           Thread.currentThread().interrupt();
+          throw new SCMException("Interrupted while waiting for datanodes to " 
+
+              "finalize.", SCMException.ResultCodes.INTERNAL_ERROR);
         }
       } else {
-        LOG.info("Open pipeline found after SCM finalization");
+        LOG.info("All {} HEALTHY datanodes have completed finalization.",
+            totalHealthyNodes);
       }
     }
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index d6a3fc54635..dbe551fcda7 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -287,16 +287,6 @@ public boolean getSafeModeStatus() {
   public void reinitialize(Table<PipelineID, Pipeline> pipelineStore) {
   }
 
-  @Override
-  public void freezePipelineCreation() {
-
-  }
-
-  @Override
-  public void resumePipelineCreation() {
-
-  }
-
   @Override
   public void close() {
   }
@@ -326,11 +316,6 @@ public void releaseWriteLock() {
 
   }
 
-  @Override
-  public boolean isPipelineCreationFrozen() {
-    return false;
-  }
-
   @Override
   public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
     return false;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
index 67e661bc9d0..c8dce937e41 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
@@ -21,14 +21,12 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
@@ -68,10 +66,6 @@ public class TestScmFinalization {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestScmFinalization.class);
 
-  // Indicates the current state of the mock pipeline manager's pipeline
-  // creation.
-  private boolean pipelineCreationFrozen = false;
-
   /**
    * Order of finalization checkpoints within the enum is used to determine
    * which ones have been passed. If ordering within the enum is changed
@@ -253,9 +247,6 @@ public void testResumeFinalizationFromCheckpoint(
         matches(OzoneConsts.FINALIZING_KEY),
         matches(""));
 
-    // Next, all pipeline creation should be stopped.
-    inOrder.verify(pipelineManager, count).freezePipelineCreation();
-
     if (initialCheckpoint == FinalizationCheckpoint.FINALIZATION_STARTED) {
       count = times(1);
     }
@@ -267,20 +258,12 @@ public void testResumeFinalizationFromCheckpoint(
         inOrder.verify(storage, count)
             .setLayoutVersion(feature.layoutVersion());
         inOrder.verify(storage, count).persistCurrentState();
-        // After MLV == SLV, all datanodes should be moved to healthy readonly.
-        if (feature.layoutVersion() ==
-            HDDSLayoutVersionManager.maxLayoutVersion()) {
-          inOrder.verify(nodeManager, count).forceNodesToHealthyReadOnly();
-        }
         inOrder.verify(buffer, count).addToBuffer(
             eq(finalizationStore),
             matches(OzoneConsts.LAYOUT_VERSION_KEY),
             eq(String.valueOf(feature.layoutVersion())));
       }
     }
-    // If this was not called in the loop, there was an error. To detect this
-    // mistake, verify again here.
-    verify(nodeManager, count).forceNodesToHealthyReadOnly();
 
     if (initialCheckpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
       count = times(1);
@@ -351,19 +334,6 @@ private PipelineManager getMockPipelineManager(
     when(pipelineManager.getPipelines(any(),
         any())).thenReturn(Arrays.asList(null, null, null));
 
-    // Set the initial value for pipeline creation based on the checkpoint.
-    // In a real cluster, this would be set on startup of the
-    // PipelineManagerImpl.
-    pipelineCreationFrozen =
-        !FinalizationManager.shouldCreateNewPipelines(inititalCheckpoint);
-    doAnswer(args -> pipelineCreationFrozen = true)
-        .when(pipelineManager).freezePipelineCreation();
-    doAnswer(args -> pipelineCreationFrozen = false)
-        .when(pipelineManager).resumePipelineCreation();
-
-    doAnswer(args -> pipelineCreationFrozen)
-        .when(pipelineManager).isPipelineCreationFrozen();
-
     return pipelineManager;
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
index aa7c78b2e5a..d9ea4f57f89 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
@@ -17,11 +17,8 @@
 
 package org.apache.hadoop.hdds.upgrade;
 
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
-import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.ALREADY_FINALIZED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_DONE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -31,16 +28,13 @@
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
@@ -60,10 +54,6 @@ public final class TestHddsUpgradeUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHddsUpgradeUtils.class);
 
-  private static final ReplicationConfig RATIS_THREE =
-      
ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
-          HddsProtos.ReplicationFactor.THREE);
-
   private TestHddsUpgradeUtils() { }
 
   public static void waitForFinalizationFromClient(
@@ -117,32 +107,13 @@ public static void 
testPostUpgradeConditionsSCM(StorageContainerManager scm,
         scmVersionManager.getMetadataLayoutVersion());
     
assertThat(scmVersionManager.getMetadataLayoutVersion()).isGreaterThanOrEqualTo(1);
 
-    // SCM should not return from finalization until there is at least one
-    // pipeline to use.
-    PipelineManager scmPipelineManager = scm.getPipelineManager();
-    try {
-      GenericTestUtils.waitFor(
-          () -> !scmPipelineManager.getPipelines(RATIS_THREE, OPEN).isEmpty(),
-          500, 60000);
-    } catch (TimeoutException | InterruptedException e) {
-      fail("Timeout waiting for Upgrade to complete on SCM.");
-    }
-
-    // SCM will not return from finalization until there is at least one
-    // RATIS 3 pipeline. For this to exist, all three of our datanodes must
-    // be in the HEALTHY state.
+    // SCM will not return from finalization until all HEALTHY datanodes
+    // have completed their finalization (MLV == SLV). This ensures datanodes
+    // are ready to serve requests even though containers may remain OPEN.
     testDataNodesStateOnSCM(scm, numDatanodes, HEALTHY, HEALTHY_READONLY);
 
     int countContainers = 0;
-    for (ContainerInfo ci : scm.getContainerManager().getContainers()) {
-      HddsProtos.LifeCycleState ciState = ci.getState();
-      LOG.info("testPostUpgradeConditionsSCM: container state is {}",
-          ciState.name());
-      assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
-          (ciState == HddsProtos.LifeCycleState.CLOSING) ||
-          (ciState == HddsProtos.LifeCycleState.DELETING) ||
-          (ciState == HddsProtos.LifeCycleState.DELETED) ||
-          (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
+    for (ContainerInfo ignored : scm.getContainerManager().getContainers()) {
       countContainers++;
     }
     assertThat(countContainers).isGreaterThanOrEqualTo(numContainers);
@@ -180,14 +151,6 @@ public static void testPreUpgradeConditionsDataNodes(
   public static void testPostUpgradeConditionsDataNodes(
       List<HddsDatanodeService> datanodes, int numContainers,
       ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
-    List<ContainerProtos.ContainerDataProto.State> closeStates =
-        Arrays.asList(validClosedContainerStates);
-    // Allow closed and quasi closed containers as valid closed containers by
-    // default.
-    if (closeStates.isEmpty()) {
-      closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
-    }
-
     try {
       GenericTestUtils.waitFor(() -> {
         for (HddsDatanodeService dataNode : datanodes) {
@@ -217,12 +180,9 @@ public static void testPostUpgradeConditionsDataNodes(
           dnVersionManager.getMetadataLayoutVersion());
       
assertThat(dnVersionManager.getMetadataLayoutVersion()).isGreaterThanOrEqualTo(1);
 
-      // Also verify that all the existing containers are closed.
-      for (Container<?> container :
+      // Verify containers are in acceptable states (OPEN is now allowed).
+      for (Container<?> ignored :
           dsm.getContainer().getController().getContainers()) {
-        assertTrue(closeStates.stream().anyMatch(
-                state -> container.getContainerState().equals(state)),
-            "Container had unexpected state " + container.getContainerState());
         countContainers++;
       }
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index e4960cce160..da3cb82e68d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -88,6 +88,7 @@ public void init(OzoneConfiguration conf,
 
     conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, 
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
     
conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SERVER_RPC_FIRST_ELECTION_TIMEOUT, 
"5s");
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, "1s");
 
     MiniOzoneHAClusterImpl.Builder clusterBuilder = 
MiniOzoneCluster.newHABuilder(conf);
     clusterBuilder.setNumOfStorageContainerManagers(NUM_SCMS)
@@ -330,45 +331,21 @@ private void checkMidFinalizationConditions(
       // At least one node (leader) should be in the FINALIZATION_REQUIRED 
stage.
       assertTrue(scms.stream().anyMatch(scm ->
           scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_REQUIRED));
-      // Pipeline creation should not be frozen at this point, even on leader.
-      assertTrue(scms.stream().noneMatch(scm ->
-          scm.getPipelineManager().isPipelineCreationFrozen()));
       break;
     case AFTER_PRE_FINALIZE_UPGRADE:
       // At least one node (leader) should be in the FINALIZATION_STARTED 
stage.
       assertTrue(scms.stream().anyMatch(scm ->
           scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_STARTED));
-      // Pipeline creation should be frozen on nodes where the finalization 
checkpoint is FINALIZATION_STARTED,
-      // this should include the leader SCM.
-      assertTrue(scms.stream()
-          .filter(scm ->
-              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_STARTED)
-          .allMatch(scm ->
-              scm.getPipelineManager().isPipelineCreationFrozen()));
       break;
     case AFTER_COMPLETE_FINALIZATION:
       // At least one node (leader) should be in the MLV_EQUALS_SLV stage.
       assertTrue(scms.stream().anyMatch(scm ->
           scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.MLV_EQUALS_SLV));
-      // Pipeline creation should not be frozen on nodes where the 
finalization checkpoint is MLV_EQUALS_SLV,
-      // this should include the leader SCM.
-      assertTrue(scms.stream()
-          .filter(scm ->
-              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.MLV_EQUALS_SLV)
-          .noneMatch(scm ->
-              scm.getPipelineManager().isPipelineCreationFrozen()));
       break;
     case AFTER_POST_FINALIZE_UPGRADE:
       // At least one node (leader) should be in the FINALIZATION_COMPLETE 
stage.
       assertTrue(scms.stream().anyMatch(scm ->
           scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_COMPLETE));
-      // Pipeline creation should not be frozen on nodes where the 
finalization checkpoint is FINALIZATION_COMPLETE,
-      // this should include the leader SCM.
-      assertTrue(scms.stream()
-          .filter(scm ->
-              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_COMPLETE)
-          .noneMatch(scm ->
-              scm.getPipelineManager().isPipelineCreationFrozen()));
       break;
     default:
       fail("Unknown halting point in test: " + haltingPoint);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to