This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e8f5f30ea HDDS-6761. [SCM HA finalization] Handle restarts, crashes, 
and leader changes. (#3534)
2e8f5f30ea is described below

commit 2e8f5f30ea070a710ca9ea4caf39a8208216707d
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Jul 12 18:31:40 2022 -0400

    HDDS-6761. [SCM HA finalization] Handle restarts, crashes, and leader 
changes. (#3534)
---
 .../upgrade/AbstractLayoutVersionManager.java      |  17 +-
 .../ozone/upgrade/BasicUpgradeFinalizer.java       |  59 ++--
 .../DefaultUpgradeFinalizationExecutor.java        |   6 +-
 .../InjectedUpgradeFinalizationExecutor.java       |   3 -
 .../ozone/upgrade/TestBasicUpgradeFinalizer.java   | 105 +++++-
 .../hadoop/ozone/upgrade/UpgradeTestUtils.java     |  59 ++++
 .../upgrade/DataNodeUpgradeFinalizer.java          |   2 +
 .../org/apache/hadoop/hdds/scm/ha/SCMContext.java  |   4 +-
 .../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java |   8 +-
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   3 +
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |   1 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |   6 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  29 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   7 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   2 +
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  20 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  19 +-
 .../hdds/scm/safemode/SCMSafeModeManager.java      |   2 +-
 .../scm/server/upgrade/FinalizationCheckpoint.java |  25 +-
 .../scm/server/upgrade/FinalizationManager.java    |  16 +
 .../server/upgrade/FinalizationManagerImpl.java    |  38 +++
 .../server/upgrade/FinalizationStateManager.java   |   7 +
 .../upgrade/FinalizationStateManagerImpl.java      | 150 ++++++++-
 .../scm/server/upgrade/SCMUpgradeFinalizer.java    |  73 ++---
 .../hdds/scm/pipeline/MockPipelineManager.java     |   5 +
 .../hdds/scm/upgrade/TestScmFinalization.java      |  52 ++-
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |  13 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       | 225 ++-----------
 .../hadoop/hdds/upgrade/TestHddsUpgradeUtils.java  | 272 ++++++++++++++++
 .../hadoop/hdds/upgrade/TestScmHAFinalization.java | 352 +++++++++++++++++++++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  37 +--
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  28 +-
 32 files changed, 1272 insertions(+), 373 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 7e043d5b31..e389f50605 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.upgrade;
 
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
@@ -55,7 +54,7 @@ public abstract class AbstractLayoutVersionManager<T extends 
LayoutFeature>
   protected final TreeMap<Integer, LayoutFeature> features = new TreeMap<>();
   @VisibleForTesting
   protected final Map<String, LayoutFeature> featureMap = new HashMap<>();
-  private volatile Status currentUpgradeState = FINALIZATION_REQUIRED;
+  private volatile Status currentUpgradeState;
   // Allows querying upgrade state while an upgrade is in progress.
   // Note that MLV may have been incremented during the upgrade
   // by the time the value is read/used.
@@ -75,6 +74,8 @@ public abstract class AbstractLayoutVersionManager<T extends 
LayoutFeature>
                 metadataLayoutVersion, softwareLayoutVersion));
       } else if (metadataLayoutVersion == softwareLayoutVersion) {
         currentUpgradeState = ALREADY_FINALIZED;
+      } else {
+        currentUpgradeState = FINALIZATION_REQUIRED;
       }
 
       LayoutFeature mlvFeature = features.get(metadataLayoutVersion);
@@ -125,7 +126,7 @@ public abstract class AbstractLayoutVersionManager<T 
extends LayoutFeature>
         metadataLayoutVersion = layoutFeature.layoutVersion();
         LOG.info("Layout feature {} has been finalized.", layoutFeature);
         if (!needsFinalization()) {
-          completeFinalization();
+          LOG.info("Finalization is complete.");
         }
       } else {
         String msgStart = "";
@@ -148,16 +149,6 @@ public abstract class AbstractLayoutVersionManager<T 
extends LayoutFeature>
     }
   }
 
-  private void completeFinalization() {
-    lock.writeLock().lock();
-    try {
-      currentUpgradeState = FINALIZATION_DONE;
-      LOG.info("Finalization is complete.");
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
   private boolean softwareIsBehindMetaData() {
     lock.readLock().lock();
     try {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
index fa4af7a231..400fffc3fa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -26,6 +26,7 @@ import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOU
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_ACTION_VALIDATION_FAILED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
 
@@ -46,9 +47,10 @@ import 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType;
 import org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 
 /**
- * UpgradeFinalizer implementation for the Storage Container Manager service.
+ * Base UpgradeFinalizer implementation to be extended by services.
  */
 public abstract class BasicUpgradeFinalizer
     <T, V extends AbstractLayoutVersionManager> implements UpgradeFinalizer<T> 
{
@@ -82,29 +84,37 @@ public abstract class BasicUpgradeFinalizer
     // sets the finalization status to FINALIZATION_IN_PROGRESS.
     // Therefore, a lock is used to make sure only one finalization thread is
     // running at a time.
-    StatusAndMessages response = initFinalize(upgradeClientID, service);
+    if (isFinalized(versionManager.getUpgradeState())) {
+      return FINALIZED_MSG;
+    }
     if (finalizationLock.tryLock()) {
       try {
-        // Even if the status indicates finalization completed, the component
-        // may not have finished all its specific steps if finalization was
-        // interrupted, so we should re-invoke them here.
+        StatusAndMessages response = initFinalize(upgradeClientID, service);
+        // If we were able to enter the lock and finalization status is "in
+        // progress", we should resume finalization because the last attempt
+        // was interrupted. If an attempt was currently ongoing, the lock
+        // would have been held.
         if (response.status() == FINALIZATION_REQUIRED ||
-            !componentFinishedFinalizationSteps(service)) {
+            response.status() == FINALIZATION_IN_PROGRESS) {
           finalizationExecutor.execute(service, this);
-          response = STARTING_MSG;
+          return STARTING_MSG;
         }
+        // Else, the initial response we got from initFinalize can be used,
+        // since we do not need to start/resume finalization.
+        return response;
+      } catch (NotLeaderException e) {
+        LOG.info("Leader change encountered during finalization. This " +
+            "component will continue finalization as directed by the new " +
+            "leader.", e);
+        return FINALIZATION_IN_PROGRESS_MSG;
       } finally {
         finalizationLock.unlock();
       }
     } else {
-      // We could not acquire the lock, so either finalization is ongoing, or
-      // it already finished but we received multiple requests to
-      // run it at the same time.
-      if (!isFinalized(response.status())) {
-        response = FINALIZATION_IN_PROGRESS_MSG;
-      }
+      // Finalization has not completed, but another thread holds the lock to
+      // run finalization.
+      return FINALIZATION_IN_PROGRESS_MSG;
     }
-    return response;
   }
 
   public synchronized StatusAndMessages reportStatus(
@@ -126,12 +136,20 @@ public abstract class BasicUpgradeFinalizer
     return versionManager.getUpgradeState();
   }
 
+  /**
+   * Child classes may override this method to set when finalization has
+   * begun progress.
+   */
   protected void preFinalizeUpgrade(T service) throws IOException {
-    // No Op by default.
+    versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
   }
 
+  /**
+   * Child classes may override this method to delay finalization being
+   * marked done until a set of post finalize actions complete.
+   */
   protected void postFinalizeUpgrade(T service) throws IOException {
-    // No Op by default.
+    versionManager.setUpgradeState(FINALIZATION_DONE);
   }
 
   @Override
@@ -226,15 +244,6 @@ public abstract class BasicUpgradeFinalizer
         || status.equals(FINALIZATION_DONE);
   }
 
-  /**
-   * Child classes that have additional finalization steps can override this
-   * method to check component specific state to determine whether
-   * finalization still needs to be run.
-   */
-  protected boolean componentFinishedFinalizationSteps(T service) {
-    return true;
-  }
-
   public abstract void finalizeLayoutFeature(LayoutFeature lf, T context)
       throws UpgradeException;
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
index 1e85581d9b..6746536d7c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.upgrade;
 
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
@@ -43,8 +42,6 @@ public class DefaultUpgradeFinalizationExecutor<T>
       throws IOException {
     try {
       finalizer.emitStartingMsg();
-      finalizer.getVersionManager()
-          .setUpgradeState(FINALIZATION_IN_PROGRESS);
 
       finalizer.preFinalizeUpgrade(component);
 
@@ -59,9 +56,10 @@ public class DefaultUpgradeFinalizationExecutor<T>
       if (finalizer.getVersionManager().needsFinalization()) {
         finalizer.getVersionManager()
             .setUpgradeState(FINALIZATION_REQUIRED);
-        throw (e);
+        throw e;
       }
     } finally {
+      // Used for testing.
       finalizer.markFinalizationDone();
     }
   }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
index bf2f35664a..23fff3576c 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
@@ -22,7 +22,6 @@ import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecuto
 import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
 import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
 import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
@@ -73,8 +72,6 @@ public class InjectedUpgradeFinalizationExecutor<T> extends
     try {
       injectTestFunctionAtThisPoint(BEFORE_PRE_FINALIZE_UPGRADE);
       finalizer.emitStartingMsg();
-      finalizer.getVersionManager()
-          .setUpgradeState(FINALIZATION_IN_PROGRESS);
 
       finalizer.preFinalizeUpgrade(component);
       injectTestFunctionAtThisPoint(AFTER_PRE_FINALIZE_UPGRADE);
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
index 5b83f3552e..96344f3984 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
@@ -22,9 +22,12 @@ import static 
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLa
 import static 
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLayoutFeature.VERSION_3;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+
+import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.inOrder;
@@ -32,6 +35,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.ozone.common.Storage;
 import 
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLayoutVersionManager;
@@ -39,12 +47,17 @@ import 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.InOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test for BasicUpgradeFinalizer.
  */
 public class TestBasicUpgradeFinalizer {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestBasicUpgradeFinalizer.class);
+
   @Test
   public void testFinalizerPhasesAreInvokedInOrder() throws IOException {
     SimpleTestFinalizer finalizer = spy(SimpleTestFinalizer.class);
@@ -91,6 +104,82 @@ public class TestBasicUpgradeFinalizer {
         finalizer.postCalled);
   }
 
+
+  /**
+   * Tests that the upgrade finalizer gives expected statuses when multiple
+   * clients invoke finalize and query finalize status simultaneously.
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentFinalization() throws Exception {
+    CountDownLatch pauseLatch = new CountDownLatch(1);
+    CountDownLatch unpauseLatch = new CountDownLatch(1);
+    // Pause finalization to test concurrent finalize requests. The injection
+    // point to pause at does not matter.
+    InjectedUpgradeFinalizationExecutor<Object> executor =
+        UpgradeTestUtils.newPausingFinalizationExecutor(
+            UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE,
+            pauseLatch, unpauseLatch, LOG);
+    SimpleTestFinalizer finalizer =
+        new SimpleTestFinalizer(
+            new MockLayoutVersionManager(VERSION_1.layoutVersion()), executor);
+
+    // The first finalize call should block until the executor is unpaused.
+    Future<?> firstFinalizeFuture = runFinalization(finalizer,
+        UpgradeFinalizer.Status.STARTING_FINALIZATION);
+    // Wait for finalization to pause at the halting point.
+    pauseLatch.await();
+
+    Future<?> secondFinalizeFuture = runFinalization(finalizer,
+        UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS);
+    Future<?> finalizeQueryFuture = runFinalizationQuery(finalizer,
+        UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS);
+
+    // While finalization is paused, the two following requests should have
+    // reported it is in progress.
+    secondFinalizeFuture.get();
+    finalizeQueryFuture.get();
+
+    // Now resume finalization so the initial finalize request can complete.
+    unpauseLatch.countDown();
+    firstFinalizeFuture.get();
+
+    // All subsequent queries should return finalization done, even if they
+    // land in parallel.
+    List<Future<?>> finalizeFutures = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      finalizeFutures.add(runFinalizationQuery(finalizer,
+          UpgradeFinalizer.Status.FINALIZATION_DONE));
+    }
+
+    // Wait for all queries to complete.
+    for (Future<?> finalizeFuture: finalizeFutures) {
+      finalizeFuture.get();
+    }
+  }
+
+  private Future<?> runFinalization(
+      BasicUpgradeFinalizer<Object, MockLayoutVersionManager> finalizer,
+      UpgradeFinalizer.Status expectedStatus) {
+    return Executors.newSingleThreadExecutor().submit(() -> {
+      try {
+        StatusAndMessages result = finalizer.finalize("test", new Object());
+        assertEquals(expectedStatus, result.status());
+      } catch (Exception ex) {
+        LOG.error("Finalization failed", ex);
+        fail("Finalization failed with exception: " +
+            ex.getMessage());
+      }
+    });
+  }
+
+  private Future<?> runFinalizationQuery(UpgradeFinalizer<Object> finalizer,
+      UpgradeFinalizer.Status expectedStatus) {
+    return Executors.newSingleThreadExecutor().submit(() -> {
+      assertEquals(expectedStatus, finalizer.getStatus());
+    });
+  }
+
   /**
    * Yet another mock finalizer.
    */
@@ -101,6 +190,9 @@ public class TestBasicUpgradeFinalizer {
     private boolean finalizeCalled = false;
     private boolean postCalled = false;
 
+    /**
+     * Invoked by Mockito.
+     */
     SimpleTestFinalizer() throws IOException {
       super(new MockLayoutVersionManager(VERSION_1.layoutVersion()));
     }
@@ -109,13 +201,20 @@ public class TestBasicUpgradeFinalizer {
       super(lvm);
     }
 
+    SimpleTestFinalizer(MockLayoutVersionManager lvm,
+                        UpgradeFinalizationExecutor<Object> executor) {
+      super(lvm, executor);
+    }
+
     @Override
-    protected void preFinalizeUpgrade(Object service) {
+    protected void preFinalizeUpgrade(Object service) throws IOException {
+      super.preFinalizeUpgrade(service);
       preCalled = true;
     }
 
     @Override
-    protected void postFinalizeUpgrade(Object service) {
+    protected void postFinalizeUpgrade(Object service) throws IOException {
+      super.postFinalizeUpgrade(service);
       postCalled = true;
     }
 
@@ -141,7 +240,7 @@ public class TestBasicUpgradeFinalizer {
 
     @Override
     public void runPrefinalizeStateActions(Storage storage, Object service) {
-
+      // no-op for testing.
     }
   }
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
index d3990e6098..d6ea2ec8b8 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
@@ -18,12 +18,15 @@
  */
 package org.apache.hadoop.ozone.upgrade;
 
+import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.common.StorageInfo;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Upgrade related test utility methods.
@@ -51,4 +54,60 @@ public final class UpgradeTestUtils {
 
     return versionFile;
   }
+
+  /**
+   * @param haltingPoint Where to halt finalization in the returned
+   *     executor's {@code execute} method.
+   * @param pauseLatch The latch that will be counted down 1 by the
+   *     executor when the upgrade finalization has been paused.
+   * @param unpauseLatch The latch that the caller should count down to
+   *     resume upgrade finalization.
+   * @param log Where to log messages about pausing and resuming finalization.
+   * @return A new InjectedUpgradeFinalizationExecutor
+   */
+  public static <T> InjectedUpgradeFinalizationExecutor<T>
+      newPausingFinalizationExecutor(UpgradeTestInjectionPoints haltingPoint,
+      CountDownLatch pauseLatch, CountDownLatch unpauseLatch, Logger log) {
+    InjectedUpgradeFinalizationExecutor<T>
+        executor = new InjectedUpgradeFinalizationExecutor<>();
+    executor.configureTestInjectionFunction(haltingPoint, () -> {
+      log.info("Halting upgrade finalization at point: {}", haltingPoint);
+      try {
+        pauseLatch.countDown();
+        unpauseLatch.await();
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IOException("SCM test finalization interrupted.", ex);
+      }
+      log.info("Upgrade finalization resumed from point: {}", haltingPoint);
+      return false;
+    });
+
+    return executor;
+  }
+
+  /**
+   * @param haltingPoint Where to halt finalization in the returned
+   *     executor's {@code execute} method.
+   * @param terminateLatch The latch that will be counted down 1 by the
+   *    executor when the upgrade finalization has been terminated.
+   * @param log Where to log messages about pausing and resuming finalization.
+   * @return A new InjectedUpgradeFinalizationExecutor
+   */
+  public static <T> InjectedUpgradeFinalizationExecutor<T>
+      newTerminatingFinalizationExecutor(
+          UpgradeTestInjectionPoints haltingPoint,
+      CountDownLatch terminateLatch, Logger log) {
+    InjectedUpgradeFinalizationExecutor<T>
+        executor =
+        new InjectedUpgradeFinalizationExecutor<>();
+    executor.configureTestInjectionFunction(haltingPoint, () -> {
+      log.info("Terminating upgrade finalization at point: {}. This is " +
+          "expected test execution.", haltingPoint);
+      terminateLatch.countDown();
+      return true;
+    });
+
+    return executor;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index 5b6272c2ab..bb4c5075b5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.upgrade;
 
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_VALIDATION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
@@ -55,6 +56,7 @@ public class DataNodeUpgradeFinalizer extends
       logAndEmit(msg);
       throw new UpgradeException(msg, PREFINALIZE_VALIDATION_FAILED);
     }
+    getVersionManager().setUpgradeState(FINALIZATION_IN_PROGRESS);
   }
 
   private boolean canFinalizeDataNode(DatanodeStateMachine dsm) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
index dc1ca87ee0..de15514839 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
@@ -250,10 +250,10 @@ public final class SCMContext {
     }
   }
 
-  public boolean isFinalizationCheckpointCrossed(FinalizationCheckpoint query) 
{
+  public FinalizationCheckpoint getFinalizationCheckpoint() {
     lock.readLock().lock();
     try {
-      return this.finalizationCheckpoint.hasCrossed(query);
+      return this.finalizationCheckpoint;
     } finally {
       lock.readLock().unlock();
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
index bb12df6ec0..71cdd92774 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -60,12 +60,16 @@ public class SCMHAInvocationHandler implements 
InvocationHandler {
   @Override
   public Object invoke(final Object proxy, final Method method,
                        final Object[] args) throws Throwable {
+    // Javadoc for InvocationHandler#invoke specifies that args will be null
+    // if the method takes no arguments. Convert this to an empty array for
+    // easier handling.
+    Object[] convertedArgs = (args == null) ? new Object[]{} : args;
     try {
       long startTime = Time.monotonicNow();
       final Object result =
           ratisHandler != null && method.isAnnotationPresent(Replicate.class) ?
-              invokeRatis(method, args) :
-              invokeLocal(method, args);
+              invokeRatis(method, convertedArgs) :
+              invokeLocal(method, convertedArgs);
       LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() - 
startTime);
       return result;
     } catch (InvocationTargetException iEx) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 16b32735aa..934c25e980 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -383,6 +383,9 @@ public class SCMHAManagerImpl implements SCMHAManager {
       }
       scm.getScmCertificateServer().reinitialize(metadataStore);
     }
+    // This call also performs upgrade finalization if the new table contains a
+    // higher metadata layout version than the SCM's current one.
+    scm.getFinalizationManager().reinitialize(metadataStore.getMetaTable());
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 74074bc454..145a40a665 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -328,6 +328,7 @@ public class SCMStateMachine extends BaseStateMachine {
           .isLeaderReady()) {
         scm.getScmContext().setLeaderReady();
         scm.getSCMServiceManager().notifyStatusChanged();
+        scm.getFinalizationManager().onLeaderReady();
       }
 
       // Means all transactions before this term have been applied.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index da4337f68a..b8a83e20b9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -46,7 +46,7 @@ import 
org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.node.states.NodeStateMap;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -216,8 +216,8 @@ public class NodeStateManager implements Runnable, 
Closeable {
     // to healthy readonly until SCM finishes updating its MLV, hence the
     // checkpoint check here.
     layoutMisMatchCondition = (layout) ->
-        scmContext.isFinalizationCheckpointCrossed(
-                    FinalizationCheckpoint.MLV_EQUALS_SLV) &&
+        FinalizationManager.shouldTellDatanodesToFinalize(
+            scmContext.getFinalizationCheckpoint()) &&
             !layoutMatchCondition.test(layout);
 
     scheduleNextHealthCheck();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 78a1b632f2..62ef30d27c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ipc.Server;
@@ -655,8 +655,8 @@ public class SCMNodeManager implements NodeManager {
           datanodeDetails.getHostName(), dnSlv, scmSlv);
     }
 
-    if (scmContext.isFinalizationCheckpointCrossed(
-        FinalizationCheckpoint.MLV_EQUALS_SLV)) {
+    if (FinalizationManager.shouldTellDatanodesToFinalize(
+        scmContext.getFinalizationCheckpoint())) {
       // Because we have crossed the MLV_EQUALS_SLV checkpoint, SCM metadata
       // layout version will not change. We can now compare it to the
       // datanodes' metadata layout versions to tell them to finalize.
@@ -675,16 +675,19 @@ public class SCMNodeManager implements NodeManager {
                 LayoutVersionProto.newBuilder()
                     .setSoftwareLayoutVersion(dnSlv)
                     .setMetadataLayoutVersion(dnSlv).build());
-        try {
-          finalizeCmd.setTerm(scmContext.getTermOfLeader());
-
-          // Send Finalize command to the data node. Its OK to
-          // send Finalize command multiple times.
-          scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
-              new CommandForDatanode<>(datanodeDetails.getUuid(), 
finalizeCmd));
-        } catch (NotLeaderException ex) {
-          LOG.warn("Skip sending finalize upgrade command since current SCM" +
-              "is not leader.", ex);
+        if (scmContext.isLeader()) {
+          try {
+            finalizeCmd.setTerm(scmContext.getTermOfLeader());
+
+            // Send Finalize command to the data node. Its OK to
+            // send Finalize command multiple times.
+            scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+                new CommandForDatanode<>(datanodeDetails.getUuid(),
+                    finalizeCmd));
+          } catch (NotLeaderException ex) {
+            LOG.warn("Skip sending finalize upgrade command since current SCM" 
+
+                " is not leader.", ex);
+          }
         }
       }
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index cf3ab0baf5..d8a1a9403f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -109,9 +109,6 @@ public class BackgroundPipelineCreator implements 
SCMService {
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
-
-    // start RatisPipelineUtilsThread
-    start();
   }
 
   /**
@@ -162,6 +159,10 @@ public class BackgroundPipelineCreator implements 
SCMService {
     }
   }
 
+  public boolean isRunning() {
+    return running.get();
+  }
+
   private void run() {
     while (running.get()) {
       if (shouldRun()) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 09816b6b92..b1c3032504 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -191,6 +191,8 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
    */
   void resumePipelineCreation();
 
+  boolean isPipelineCreationFrozen();
+
   /**
    * Acquire read lock.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 3e5d86d0af..aaa3088f9a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -155,6 +156,16 @@ public class PipelineManagerImpl implements 
PipelineManager {
     BackgroundPipelineCreator backgroundPipelineCreator =
         new BackgroundPipelineCreator(pipelineManager, conf, scmContext, 
clock);
 
+    pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+    serviceManager.register(backgroundPipelineCreator);
+
+    if (FinalizationManager.shouldCreateNewPipelines(
+        scmContext.getFinalizationCheckpoint())) {
+      pipelineManager.resumePipelineCreation();
+    } else {
+      pipelineManager.freezePipelineCreation();
+    }
+
     final long scrubberIntervalInMillis = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
         ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
@@ -178,10 +189,7 @@ public class PipelineManagerImpl implements 
PipelineManager {
               }
             }).build();
 
-    pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
     pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
-
-    serviceManager.register(backgroundPipelineCreator);
     serviceManager.register(backgroundPipelineScrubber);
 
     return pipelineManager;
@@ -686,6 +694,12 @@ public class PipelineManagerImpl implements 
PipelineManager {
     backgroundPipelineCreator.start();
   }
 
+  @Override
+  public boolean isPipelineCreationFrozen() {
+    return freezePipelineCreation.get() &&
+        !backgroundPipelineCreator.isRunning();
+  }
+
   @Override
   public void close() throws IOException {
     if (backgroundPipelineCreator != null) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 3e735b454b..02e5fdc430 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 
@@ -53,12 +55,14 @@ public class HealthyPipelineSafeModeRule extends 
SafeModeExitRule<Pipeline> {
   private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
   private final PipelineManager pipelineManager;
   private final int minHealthyPipelines;
+  private final SCMContext scmContext;
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
-      PipelineManager pipelineManager,
-      SCMSafeModeManager manager, ConfigurationSource configuration) {
+      PipelineManager pipelineManager, SCMSafeModeManager manager,
+      ConfigurationSource configuration, SCMContext scmContext) {
     super(manager, ruleName, eventQueue);
     this.pipelineManager = pipelineManager;
+    this.scmContext = scmContext;
     healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
@@ -101,7 +105,16 @@ public class HealthyPipelineSafeModeRule extends 
SafeModeExitRule<Pipeline> {
 
   @Override
   protected synchronized boolean validate() {
-    return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
+    boolean shouldRunSafemodeCheck =
+        FinalizationManager.shouldCreateNewPipelines(
+            scmContext.getFinalizationCheckpoint());
+    if (!shouldRunSafemodeCheck) {
+      LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
+          "finalization. Bypassing healthy pipeline safemode rule.");
+      return true;
+    } else {
+      return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a13a2d73b7..fe0c7bef77 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -140,7 +140,7 @@ public class SCMSafeModeManager implements SafeModeManager {
         HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
             new HealthyPipelineSafeModeRule(HEALTHY_PIPELINE_EXIT_RULE,
                 eventQueue, pipelineManager,
-                this, config);
+                this, config, scmContext);
         OneReplicaPipelineSafeModeRule oneReplicaPipelineSafeModeRule =
             new OneReplicaPipelineSafeModeRule(
                 ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, eventQueue,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
index 1db1323f24..84e662bf4e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdds.scm.server.upgrade;
 
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+
 /**
  * A finalization checkpoint is an abstraction over SCM's disk state,
  * indicating where finalization left off so it can be resumed on leader
@@ -28,18 +30,27 @@ package org.apache.hadoop.hdds.scm.server.upgrade;
  * layout version.
  */
 public enum FinalizationCheckpoint {
-  FINALIZATION_REQUIRED(false, true),
-  FINALIZATION_STARTED(true, true),
-  MLV_EQUALS_SLV(true, false),
-  FINALIZATION_COMPLETE(false, false);
+  FINALIZATION_REQUIRED(false, true,
+      UpgradeFinalizer.Status.FINALIZATION_REQUIRED),
+  FINALIZATION_STARTED(true, true,
+      UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS),
+  MLV_EQUALS_SLV(true, false,
+      UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS),
+  FINALIZATION_COMPLETE(false, false,
+      UpgradeFinalizer.Status.FINALIZATION_DONE);
 
   private final boolean needsFinalizingMark;
   private final boolean needsMlvBehindSlv;
+  // The upgrade status that should be reported back to the client when this
+  // checkpoint is crossed.
+  private final UpgradeFinalizer.Status status;
 
   FinalizationCheckpoint(boolean needsFinalizingMark,
-                         boolean needsMlvBehindSlv) {
+                         boolean needsMlvBehindSlv,
+                         UpgradeFinalizer.Status status) {
     this.needsFinalizingMark = needsFinalizingMark;
     this.needsMlvBehindSlv = needsMlvBehindSlv;
+    this.status = status;
   }
 
   /**
@@ -70,4 +81,8 @@ public enum FinalizationCheckpoint {
   public boolean hasCrossed(FinalizationCheckpoint query) {
     return this.compareTo(query) >= 0;
   }
+
+  public UpgradeFinalizer.Status getStatus() {
+    return status;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
index 7456c8ed46..49fc914fb3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 
@@ -51,4 +52,19 @@ public interface FinalizationManager {
   void buildUpgradeContext(NodeManager nodeManager,
                                   PipelineManager pipelineManager,
                                   SCMContext scmContext);
+
+  void reinitialize(Table<String, String> finalizationStore) throws 
IOException;
+
+  void onLeaderReady();
+
+  static boolean shouldCreateNewPipelines(FinalizationCheckpoint checkpoint) {
+    return !checkpoint.hasCrossed(FinalizationCheckpoint.FINALIZATION_STARTED)
+        || checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+  }
+
+  static boolean shouldTellDatanodesToFinalize(
+      FinalizationCheckpoint checkpoint) {
+    return checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
index 4d759469d3..0a6c347658 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
@@ -32,14 +32,21 @@ import 
org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.Executors;
 
 /**
  * Class to initiate SCM finalization and query its progress.
  */
 public class FinalizationManagerImpl implements FinalizationManager {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FinalizationManagerImpl.class);
+
   private SCMUpgradeFinalizer upgradeFinalizer;
   private SCMUpgradeFinalizationContext context;
   private SCMStorageConfig storage;
@@ -134,6 +141,37 @@ public class FinalizationManagerImpl implements 
FinalizationManager {
     return finalizationStateManager.getFinalizationCheckpoint();
   }
 
+  @Override
+  public void reinitialize(Table<String, String> finalizationStore)
+      throws IOException {
+    finalizationStateManager.reinitialize(finalizationStore);
+  }
+
+  @Override
+  public void onLeaderReady() {
+    // Launch a background thread to drive finalization.
+    Executors.newSingleThreadExecutor().submit(() -> {
+      FinalizationCheckpoint currentCheckpoint = getCheckpoint();
+      if (currentCheckpoint.hasCrossed(
+          FinalizationCheckpoint.FINALIZATION_STARTED) &&
+          !currentCheckpoint.hasCrossed(
+              FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
+        LOG.info("SCM became leader. Resuming upgrade finalization from" +
+            " current checkpoint {}.", currentCheckpoint);
+        try {
+          finalizeUpgrade("resume-finalization-as-leader");
+        } catch (IOException ex) {
+          ExitUtils.terminate(1,
+              "Resuming upgrade finalization failed on SCM leader change.",
+              ex, true, LOG);
+        }
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("SCM became leader. No upgrade finalization action" +
+            " required for current checkpoint {}", currentCheckpoint);
+      }
+    });
+  }
+
   /**
    * Builds a {@link FinalizationManagerImpl}.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
index 44868715f8..fe1a8e91dd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm.server.upgrade;
 
 import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.Table;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
@@ -46,4 +47,10 @@ public interface FinalizationStateManager {
   FinalizationCheckpoint getFinalizationCheckpoint();
 
   void setUpgradeContext(SCMUpgradeFinalizationContext context);
+
+  /**
+   * Called on snapshot installation.
+   */
+  void reinitialize(Table<String, String> newFinalizationStore)
+      throws IOException;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
index 786612628a..958343cb40 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hdds.scm.server.upgrade;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
 import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -41,10 +44,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class FinalizationStateManagerImpl implements FinalizationStateManager {
 
-  private static final Logger LOG =
+  @VisibleForTesting
+  public static final Logger LOG =
       LoggerFactory.getLogger(FinalizationStateManagerImpl.class);
 
-  private final Table<String, String> finalizationStore;
+  private Table<String, String> finalizationStore;
   private final DBTransactionBuffer transactionBuffer;
   private final HDDSLayoutVersionManager versionManager;
   // Ensures that we are not in the process of updating checkpoint state as
@@ -62,13 +66,51 @@ public class FinalizationStateManagerImpl implements 
FinalizationStateManager {
     this.upgradeFinalizer = builder.upgradeFinalizer;
     this.versionManager = this.upgradeFinalizer.getVersionManager();
     this.checkpointLock = new ReentrantReadWriteLock();
+    initialize();
+  }
+
+  private void initialize() throws IOException {
     this.hasFinalizingMark =
         finalizationStore.isExist(OzoneConsts.FINALIZING_KEY);
   }
 
+  private void publishCheckpoint(FinalizationCheckpoint checkpoint) {
+    // Move the upgrade status according to this checkpoint. This is sent
+    // back to the client if they query for the current upgrade status.
+    versionManager.setUpgradeState(checkpoint.getStatus());
+
+    // Check whether this checkpoint change requires us to move node state.
+    // If this is necessary, it must be done before unfreezing pipeline
+    // creation to make sure nodes are not added to pipelines based on
+    // outdated layout information.
+    // This operation is not idempotent.
+    if (checkpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
+      upgradeContext.getNodeManager().forceNodesToHealthyReadOnly();
+    }
+
+    // Check whether this checkpoint change requires us to freeze pipeline
+    // creation. These are idempotent operations.
+    PipelineManager pipelineManager = upgradeContext.getPipelineManager();
+    if (FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
+        pipelineManager.isPipelineCreationFrozen()) {
+      pipelineManager.resumePipelineCreation();
+    } else if (!FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
+          !pipelineManager.isPipelineCreationFrozen()) {
+      pipelineManager.freezePipelineCreation();
+    }
+
+    // Set the checkpoint in the SCM context so other components can read it.
+    upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
+  }
+
   @Override
   public void setUpgradeContext(SCMUpgradeFinalizationContext context) {
     this.upgradeContext = context;
+    FinalizationCheckpoint checkpoint = getFinalizationCheckpoint();
+    upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
+    // Set the version manager's upgrade status (sent back to the client to
+    // identify upgrade progress) based on the current checkpoint.
+    versionManager.setUpgradeState(checkpoint.getStatus());
   }
 
   @Override
@@ -79,14 +121,22 @@ public class FinalizationStateManagerImpl implements 
FinalizationStateManager {
     } finally {
       checkpointLock.writeLock().unlock();
     }
-    upgradeContext.getSCMContext().setFinalizationCheckpoint(
-        FinalizationCheckpoint.FINALIZATION_STARTED);
     transactionBuffer.addToBuffer(finalizationStore,
         OzoneConsts.FINALIZING_KEY, "");
+    publishCheckpoint(FinalizationCheckpoint.FINALIZATION_STARTED);
   }
 
   @Override
   public void finalizeLayoutFeature(Integer layoutVersion) throws IOException {
+    finalizeLayoutFeatureLocal(layoutVersion);
+  }
+
+  /**
+   * A version of finalizeLayoutFeature without the {@link Replicate}
+   * annotation that can be called by followers to finalize from a snapshot.
+   */
+  private void finalizeLayoutFeatureLocal(Integer layoutVersion)
+      throws IOException {
     checkpointLock.writeLock().lock();
     try {
       // The VERSION file is the source of truth for the current layout
@@ -101,8 +151,7 @@ public class FinalizationStateManagerImpl implements 
FinalizationStateManager {
     }
 
     if (!versionManager.needsFinalization()) {
-      upgradeContext.getSCMContext().setFinalizationCheckpoint(
-          FinalizationCheckpoint.MLV_EQUALS_SLV);
+      publishCheckpoint(FinalizationCheckpoint.MLV_EQUALS_SLV);
     }
     transactionBuffer.addToBuffer(finalizationStore,
         OzoneConsts.LAYOUT_VERSION_KEY, String.valueOf(layoutVersion));
@@ -132,8 +181,7 @@ public class FinalizationStateManagerImpl implements 
FinalizationStateManager {
       ExitUtils.terminate(1, errorMessage, LOG);
     }
 
-    upgradeContext.getSCMContext().setFinalizationCheckpoint(
-        FinalizationCheckpoint.FINALIZATION_COMPLETE);
+    publishCheckpoint(FinalizationCheckpoint.FINALIZATION_COMPLETE);
   }
 
   @Override
@@ -164,14 +212,90 @@ public class FinalizationStateManagerImpl implements 
FinalizationStateManager {
       }
     }
 
-    String errorMessage = String.format("SCM upgrade finalization " +
-            "is in an unknown state.%nFinalizing mark present? %b%n" +
-            "Metadata layout version behind software layout version? %b",
-        hasFinalizingMarkSnapshot, mlvBehindSlvSnapshot);
-    Preconditions.checkNotNull(currentCheckpoint, errorMessage);
+    // SCM cannot function if it does not know which finalization checkpoint
+    // it is on, so it must terminate. This should only happen in the case of
+    // a serious bug.
+    if (currentCheckpoint == null) {
+      String errorMessage = String.format("SCM upgrade finalization " +
+              "is in an unknown state.%nFinalizing mark present? %b%n" +
+              "Metadata layout version behind software layout version? %b",
+          hasFinalizingMarkSnapshot, mlvBehindSlvSnapshot);
+      ExitUtils.terminate(1, errorMessage, LOG);
+    }
+
     return currentCheckpoint;
   }
 
+  /**
+   * Called on snapshot installation.
+   */
+  @Override
+  public void reinitialize(Table<String, String> newFinalizationStore)
+      throws IOException {
+    checkpointLock.writeLock().lock();
+    try {
+      this.finalizationStore.close();
+      this.finalizationStore = newFinalizationStore;
+      initialize();
+
+      int dbLayoutVersion = getDBLayoutVersion();
+      int currentLayoutVersion = versionManager.getMetadataLayoutVersion();
+      if (currentLayoutVersion < dbLayoutVersion) {
+        // Snapshot contained a higher metadata layout version. Finalize this
+        // follower SCM as a result.
+        LOG.info("New SCM snapshot received with metadata layout version {}, " 
+
+                "which is higher than this SCM's metadata layout version {}." +
+                "Attempting to finalize current SCM to that version.",
+            dbLayoutVersion, currentLayoutVersion);
+        // Since the SCM is finalizing from a snapshot, it is a follower, and
+        // does not need to run the leader only finalization driving actions
+        // that the UpgradeFinalizationExecutor contains. Just run the
+        // upgrade actions for the layout features, set the finalization
+        // checkpoint, and increase the version in the VERSION file.
+        for (int version = currentLayoutVersion + 1; version <= 
dbLayoutVersion;
+             version++) {
+          finalizeLayoutFeatureLocal(version);
+        }
+      }
+      publishCheckpoint(getFinalizationCheckpoint());
+    } catch (Exception ex) {
+      LOG.error("Failed to reinitialize finalization state", ex);
+      throw new IOException(ex);
+    } finally {
+      checkpointLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Gets the metadata layout version from the SCM RocksDB. This is used for
+   * Ratis snapshot based finalization in a slow follower. In all other
+   * cases, the VERSION file should be the source of truth.
+   *
+   * MLV was not stored in RocksDB until SCM HA supported snapshot based
+   * finalization, which was after a few HDDS layout features
+   * were introduced. If the SCM has not finalized since this code
+   * was added, the layout version will not be there. Defer to the MLV in the
+   * VERSION file in this case, since finalization is not ongoing. The key will
+   * be added once finalization is started with this software version.
+   */
+  private int getDBLayoutVersion() throws IOException {
+    String dbLayoutVersion = finalizationStore.get(
+        OzoneConsts.LAYOUT_VERSION_KEY);
+    if (dbLayoutVersion == null) {
+      return versionManager.getMetadataLayoutVersion();
+    } else {
+      try {
+        return Integer.parseInt(dbLayoutVersion);
+      } catch (NumberFormatException ex) {
+        String msg = String.format(
+            "Failed to read layout version from SCM DB. Found string %s",
+            dbLayoutVersion);
+        LOG.error(msg, ex);
+        throw new IOException(msg, ex);
+      }
+    }
+  }
+
   /**
    * Builds a {@link FinalizationManagerImpl}.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index fa3520ee38..b65cc00706 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature;
 import org.apache.hadoop.ozone.upgrade.UpgradeException;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 
 /**
  * UpgradeFinalizer for the Storage Container Manager service.
@@ -59,19 +61,6 @@ public class SCMUpgradeFinalizer extends
     LOG.info("SCM Finalization has crossed checkpoint {}", checkpoint);
   }
 
-  @Override
-  protected boolean componentFinishedFinalizationSteps(
-      SCMUpgradeFinalizationContext context) {
-    // By default, the parent class will mark finalization as complete when
-    // MLV == SLV. However, for SCM, there are a few extra steps that need to
-    // be done after this point (see postFinalizeUpgrade). If there is a
-    // leader change or restart after MLV == SLV but before running these
-    // post finalize steps, we must tell the parent to run finalization
-    // even though MLV == SLV.
-    return context.getFinalizationStateManager()
-        .crossedCheckpoint(FinalizationCheckpoint.FINALIZATION_COMPLETE);
-  }
-
   @Override
   public void preFinalizeUpgrade(SCMUpgradeFinalizationContext context)
       throws IOException {
@@ -123,33 +112,24 @@ public class SCMUpgradeFinalizer extends
     super.finalizeLayoutFeature(lf,
         lf.scmAction(LayoutFeature.UpgradeActionType.ON_FINALIZE),
         context.getStorage());
-
-    if (!getVersionManager().needsFinalization()) {
-      // If we just finalized the last layout feature, don 't wait for next
-      // heartbeat from datanodes in order to move them to
-      // Healthy - Readonly state. Force them to Healthy ReadOnly state so that
-      // we can resume pipeline creation right away.
-      context.getNodeManager().forceNodesToHealthyReadOnly();
-    }
   }
 
   public void postFinalizeUpgrade(SCMUpgradeFinalizationContext context)
       throws IOException {
-    try {
-      // If we reached this phase of finalization, all layout features should
-      // be finalized.
-      logCheckpointCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
-      FinalizationStateManager stateManager =
-          context.getFinalizationStateManager();
-      if (!stateManager.crossedCheckpoint(
-          FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
-        createPipelinesAfterFinalization(context.getPipelineManager());
+    // If we reached this phase of finalization, all layout features should
+    // be finalized.
+    logCheckpointCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+    FinalizationStateManager stateManager =
+        context.getFinalizationStateManager();
+    if (!stateManager.crossedCheckpoint(
+        FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
+      createPipelinesAfterFinalization(context);
+      // @Replicate methods are required to throw TimeoutException.
+      try {
         stateManager.removeFinalizingMark();
+      } catch (TimeoutException ex) {
+        throw new IOException(ex);
       }
-      logCheckpointCrossed(FinalizationCheckpoint.FINALIZATION_COMPLETE);
-    } catch (TimeoutException ex) {
-      LOG.error("TimeoutException during postFinalizeUpgrade", ex);
-      throw new IOException(ex);
     }
   }
 
@@ -172,8 +152,13 @@ public class SCMUpgradeFinalizer extends
     msg += "\n  New pipelines creation will remain frozen until Upgrade " +
         "is finalized.";
 
-    // Pipeline creation will remain frozen until postFinalizeUpgrade()
-    pipelineManager.freezePipelineCreation();
+    // Pipeline creation should already be frozen when the finalization state
+    // manager set the checkpoint.
+    if (!pipelineManager.isPipelineCreationFrozen()) {
+      throw new SCMException("Error during finalization. Pipeline creation" +
+          "should have been frozen before closing existing pipelines.",
+          SCMException.ResultCodes.INTERNAL_ERROR);
+    }
 
     for (Pipeline pipeline : pipelineManager.getPipelines()) {
       if (pipeline.getPipelineState() != CLOSED) {
@@ -194,13 +179,25 @@ public class SCMUpgradeFinalizer extends
   }
 
   private void createPipelinesAfterFinalization(
-      PipelineManager pipelineManager) {
-    pipelineManager.resumePipelineCreation();
+      SCMUpgradeFinalizationContext context) throws SCMException,
+      NotLeaderException {
+    // Pipeline creation should already be resumed when the finalization state
+    // manager set the checkpoint.
+    PipelineManager pipelineManager = context.getPipelineManager();
+    if (pipelineManager.isPipelineCreationFrozen()) {
+      throw new SCMException("Error during finalization. Pipeline creation " +
+          "should have been resumed before waiting for new pipelines.",
+          SCMException.ResultCodes.INTERNAL_ERROR);
+    }
 
     // Wait for at least one pipeline to be created before finishing
     // finalization, so clients can write.
     boolean hasPipeline = false;
     while (!hasPipeline) {
+      // Break out of the wait and step down from driving finalization if this
+      // SCM is no longer the leader by throwing NotLeaderException.
+      context.getSCMContext().getTermOfLeader();
+
       ReplicationConfig ratisThree =
           ReplicationConfig.fromProtoTypeAndFactor(
               HddsProtos.ReplicationType.RATIS,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index efa4bed92e..dd5cf9b401 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -315,4 +315,9 @@ public class MockPipelineManager implements PipelineManager 
{
   public void releaseWriteLock() {
 
   }
+
+  @Override
+  public boolean isPipelineCreationFrozen() {
+    return false;
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
index 49012abc61..11174a0341 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatchers;
@@ -61,6 +62,10 @@ public class TestScmFinalization {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestScmFinalization.class);
 
+  // Indicates the current state of the mock pipeline manager's pipeline
+  // creation.
+  private boolean pipelineCreationFrozen = false;
+
   /**
    * Order of finalization checkpoints within the enum is used to determine
    * which ones have been passed. If ordering within the enum is changed
@@ -89,6 +94,8 @@ public class TestScmFinalization {
     HDDSLayoutVersionManager versionManager =
         new HDDSLayoutVersionManager(
             HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    PipelineManager pipelineManager =
+        getMockPipelineManager(FinalizationCheckpoint.FINALIZATION_REQUIRED);
     // State manager keeps upgrade information in memory as well as writing
     // it to disk, so we can mock the classes that handle disk ops for this
     // test.
@@ -111,7 +118,7 @@ public class TestScmFinalization {
         .setStorage(Mockito.mock(SCMStorageConfig.class))
         .setLayoutVersionManager(versionManager)
         .setSCMContext(scmContext)
-        .setPipelineManager(Mockito.mock(PipelineManager.class))
+        .setPipelineManager(pipelineManager)
         .setNodeManager(Mockito.mock(NodeManager.class))
         .build();
     stateManager.setUpgradeContext(context);
@@ -149,7 +156,9 @@ public class TestScmFinalization {
       FinalizationStateManager stateManager,
       FinalizationCheckpoint expectedCheckpoint) {
 
-    assertTrue(context.isFinalizationCheckpointCrossed(expectedCheckpoint));
+    // SCM context should have been updated with the current checkpoint.
+    assertTrue(context.getFinalizationCheckpoint()
+        .hasCrossed(expectedCheckpoint));
     for (FinalizationCheckpoint checkpoint: FinalizationCheckpoint.values()) {
       LOG.info("Comparing expected checkpoint {} to {}", expectedCheckpoint,
           checkpoint);
@@ -179,12 +188,6 @@ public class TestScmFinalization {
     LOG.info("Testing finalization beginning at checkpoint {}",
         initialCheckpoint);
 
-    PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-    // After finalization, SCM will wait for at least one pipeline to be
-    // created. It does not care about the contents of the pipeline list, so
-    // just return something with length >= 1.
-    Mockito.when(pipelineManager.getPipelines(Mockito.any(),
-        Mockito.any())).thenReturn(Arrays.asList(null, null, null));
     // Create the table and version manager to appear as if we left off from in
     // progress finalization.
     Table<String, String> finalizationStore =
@@ -198,6 +201,8 @@ public class TestScmFinalization {
     SCMStorageConfig storage = Mockito.mock(SCMStorageConfig.class);
     SCMContext scmContext = SCMContext.emptyContext();
     scmContext.setFinalizationCheckpoint(initialCheckpoint);
+    PipelineManager pipelineManager =
+        getMockPipelineManager(initialCheckpoint);
 
     FinalizationStateManager stateManager =
         new FinalizationStateManagerTestImpl.Builder()
@@ -242,13 +247,13 @@ public class TestScmFinalization {
         ArgumentMatchers.matches(OzoneConsts.FINALIZING_KEY),
         ArgumentMatchers.matches(""));
 
+    // Next, all pipeline creation should be stopped.
+    inOrder.verify(pipelineManager, count).freezePipelineCreation();
+
     if (initialCheckpoint == FinalizationCheckpoint.FINALIZATION_STARTED) {
       count = Mockito.times(1);
     }
 
-    // Next, all pipeline creation should be stopped.
-    inOrder.verify(pipelineManager, count).freezePipelineCreation();
-
     // Next, each layout feature should be finalized.
     for (HDDSLayoutFeature feature: HDDSLayoutFeature.values()) {
       // Cannot finalize initial version since we are already there.
@@ -330,4 +335,29 @@ public class TestScmFinalization {
       return UpgradeFinalizer.STARTING_MSG;
     }
   }
+
+  private PipelineManager getMockPipelineManager(
+      FinalizationCheckpoint inititalCheckpoint) {
+    PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    // After finalization, SCM will wait for at least one pipeline to be
+    // created. It does not care about the contents of the pipeline list, so
+    // just return something with length >= 1.
+    Mockito.when(pipelineManager.getPipelines(Mockito.any(),
+        Mockito.any())).thenReturn(Arrays.asList(null, null, null));
+
+    // Set the initial value for pipeline creation based on the checkpoint.
+    // In a real cluster, this would be set on startup of the
+    // PipelineManagerImpl.
+    pipelineCreationFrozen =
+        !FinalizationManager.shouldCreateNewPipelines(inititalCheckpoint);
+    Mockito.doAnswer(args -> pipelineCreationFrozen = true)
+        .when(pipelineManager).freezePipelineCreation();
+    Mockito.doAnswer(args -> pipelineCreationFrozen = false)
+        .when(pipelineManager).resumePipelineCreation();
+
+    Mockito.doAnswer(args -> pipelineCreationFrozen)
+        .when(pipelineManager).isPipelineCreationFrozen();
+
+    return pipelineManager;
+  }
 }
diff --git 
a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
 
b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 87a852b875..263da48ae2 100644
--- 
a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ 
b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.failure.FailureManager;
@@ -104,7 +105,8 @@ public class MiniOzoneChaosCluster extends 
MiniOzoneHAClusterImpl {
       OMHAService omService, SCMHAService scmService,
       List<HddsDatanodeService> hddsDatanodes, String clusterPath,
       Set<Class<? extends Failures>> clazzes) {
-    super(conf, omService, scmService, hddsDatanodes, clusterPath, null);
+    super(conf, new SCMConfigurator(), omService, scmService, hddsDatanodes,
+        clusterPath, null);
     this.numDatanodes = getHddsDatanodes().size();
     this.numOzoneManagers = omService.getServices().size();
     this.numStorageContainerManagers = scmService.getServices().size();
@@ -425,11 +427,12 @@ public class MiniOzoneChaosCluster extends 
MiniOzoneHAClusterImpl {
     failedScmSet.add(scm);
   }
 
-  public void restartStorageContainerManager(StorageContainerManager scm,
-      boolean waitForScm) throws IOException, TimeoutException,
-      InterruptedException, AuthenticationException {
-    super.restartStorageContainerManager(scm, waitForScm);
+  public StorageContainerManager restartStorageContainerManager(
+      StorageContainerManager scm, boolean waitForScm)
+      throws IOException, TimeoutException, InterruptedException,
+      AuthenticationException {
     failedScmSet.remove(scm);
+    return super.restartStorageContainerManager(scm, waitForScm);
   }
 
   // Should the selected node be stopped or started.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
index 766c479155..7568907dc5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
@@ -22,7 +22,6 @@ import static java.lang.Thread.sleep;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
@@ -38,9 +37,7 @@ import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_F
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -53,15 +50,12 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -72,11 +66,11 @@ import 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
@@ -84,7 +78,6 @@ import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
 import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.apache.ozone.test.tag.Flaky;
 import org.junit.jupiter.api.AfterEach;
@@ -114,7 +107,7 @@ public class TestHDDSUpgrade {
   private static final int NUM_DATA_NODES = 3;
   private static final int NUM_SCMS = 3;
 
-  private MiniOzoneCluster cluster;
+  private MiniOzoneHAClusterImpl cluster;
   private OzoneConfiguration conf;
   private StorageContainerManager scm;
   private ContainerManager scmContainerManager;
@@ -158,7 +151,8 @@ public class TestHDDSUpgrade {
     SCMConfigurator scmConfigurator = new SCMConfigurator();
     scmConfigurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor);
 
-    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+    MiniOzoneCluster.Builder builder =
+        new MiniOzoneHAClusterImpl.Builder(conf)
         .setNumDatanodes(NUM_DATA_NODES)
         .setNumOfStorageContainerManagers(NUM_SCMS)
         .setSCMConfigurator(scmConfigurator)
@@ -183,7 +177,7 @@ public class TestHDDSUpgrade {
   }
 
   public void init() throws Exception {
-    cluster = clusterProvider.provide();
+    cluster = (MiniOzoneHAClusterImpl) clusterProvider.provide();
     conf = cluster.getConf();
     loadSCMState();
   }
@@ -227,143 +221,6 @@ public class TestHDDSUpgrade {
     key.close();
   }
 
-  /*
-   * Helper function to test Pre-Upgrade conditions on the SCM
-   */
-  private void testPreUpgradeConditionsSCM() {
-    Assert.assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
-        scmVersionManager.getMetadataLayoutVersion());
-    for (ContainerInfo ci : scmContainerManager.getContainers()) {
-      Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, ci.getState());
-    }
-  }
-
-  /*
-   * Helper function to test Post-Upgrade conditions on the SCM
-   */
-  private void testPostUpgradeConditionsSCM() {
-    loadSCMState();
-    Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
-        scmVersionManager.getMetadataLayoutVersion());
-    Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
-
-    // SCM should not return from finalization until there is at least one
-    // pipeline to use.
-    try {
-      GenericTestUtils.waitFor(() -> {
-        int pipelineCount = scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
-            .size();
-        if (pipelineCount >= 1) {
-          return true;
-        }
-        return false;
-      }, 500, 60000);
-    } catch (TimeoutException | InterruptedException e) {
-      Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
-    }
-
-    // SCM will not return from finalization until there is at least one
-    // RATIS 3 pipeline. For this to exist, all three of our datanodes must
-    // be in the HEALTHY state.
-    testDataNodesStateOnSCM(HEALTHY, HEALTHY_READONLY);
-
-    int countContainers = 0;
-    for (ContainerInfo ci : scmContainerManager.getContainers()) {
-      HddsProtos.LifeCycleState ciState = ci.getState();
-      LOG.info("testPostUpgradeConditionsSCM: container state is {}",
-          ciState.name());
-      Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
-          (ciState == HddsProtos.LifeCycleState.CLOSING) ||
-          (ciState == HddsProtos.LifeCycleState.DELETING) ||
-          (ciState == HddsProtos.LifeCycleState.DELETED) ||
-          (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
-      countContainers++;
-    }
-    Assert.assertTrue(countContainers >= numContainersCreated);
-  }
-
-  /*
-   * Helper function to test Pre-Upgrade conditions on all the DataNodes.
-   */
-  private void testPreUpgradeConditionsDataNodes() {
-    for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
-      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      HDDSLayoutVersionManager dnVersionManager =
-          dsm.getLayoutVersionManager();
-      Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
-    }
-
-    int countContainers = 0;
-    for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
-      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      // Also verify that all the existing containers are open.
-      for (Iterator<Container<?>> it =
-           dsm.getContainer().getController().getContainers(); it.hasNext();) {
-        Container container = it.next();
-        Assert.assertTrue(container.getContainerState() ==
-            ContainerProtos.ContainerDataProto.State.OPEN);
-        countContainers++;
-      }
-    }
-    Assert.assertTrue(countContainers >= 1);
-  }
-
-  /*
-   * Helper function to test Post-Upgrade conditions on all the DataNodes.
-   */
-  private void testPostUpgradeConditionsDataNodes(
-      ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
-    List<ContainerProtos.ContainerDataProto.State> closeStates =
-        Arrays.asList(validClosedContainerStates);
-    // Allow closed and quasi closed containers as valid closed containers by
-    // default.
-    if (closeStates.isEmpty()) {
-      closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
-    }
-
-    try {
-      GenericTestUtils.waitFor(() -> {
-        for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
-          DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-          try {
-            if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
-                (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
-              return false;
-            }
-          } catch (IOException e) {
-            LOG.error("Exception. ", e);
-            return false;
-          }
-        }
-        return true;
-      }, 500, 60000);
-    } catch (TimeoutException | InterruptedException e) {
-      Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
-    }
-
-    int countContainers = 0;
-    for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
-      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      HDDSLayoutVersionManager dnVersionManager =
-          dsm.getLayoutVersionManager();
-      Assert.assertEquals(dnVersionManager.getSoftwareLayoutVersion(),
-          dnVersionManager.getMetadataLayoutVersion());
-      Assert.assertTrue(dnVersionManager.getMetadataLayoutVersion() >= 1);
-
-      // Also verify that all the existing containers are closed.
-      for (Iterator<Container<?>> it =
-           dsm.getContainer().getController().getContainers(); it.hasNext();) {
-        Container<?> container = it.next();
-        Assert.assertTrue("Container had unexpected state " +
-                container.getContainerState(),
-            closeStates.stream().anyMatch(
-                state -> container.getContainerState().equals(state)));
-        countContainers++;
-      }
-    }
-    Assert.assertTrue(countContainers >= 1);
-  }
-
   /*
    * Helper function to test that we can create new pipelines Post-Upgrade.
    */
@@ -379,31 +236,6 @@ public class TestHDDSUpgrade {
     Assert.assertEquals(pid, ratisPipeline1.getId());
   }
 
-  /*
-   * Helper function to test DataNode state on the SCM. Note that due to
-   * timing constraints, sometime the node-state can transition to the next
-   * state. This function expects the DataNode to be in NodeState "state" or
-   * "alternateState". Some tests can enforce a unique NodeState test by
-   * setting "alternateState = null".
-   */
-  private void testDataNodesStateOnSCM(NodeState state,
-                                       NodeState alternateState) {
-    int countNodes = 0;
-    for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
-      try {
-        NodeState dnState =
-            scm.getScmNodeManager().getNodeStatus(dn).getHealth();
-        Assert.assertTrue((dnState == state) ||
-            (alternateState == null ? false : dnState == alternateState));
-      } catch (NodeNotFoundException e) {
-        e.printStackTrace();
-        Assert.fail("Node not found");
-      }
-      ++countNodes;
-    }
-    Assert.assertEquals(NUM_DATA_NODES, countNodes);
-  }
-
   /*
    * Helper function to wait for Pipeline creation.
    */
@@ -443,8 +275,10 @@ public class TestHDDSUpgrade {
     createTestContainers();
 
     // Test the Pre-Upgrade conditions on SCM as well as DataNodes.
-    testPreUpgradeConditionsSCM();
-    testPreUpgradeConditionsDataNodes();
+    TestHddsUpgradeUtils.testPreUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList());
+    TestHddsUpgradeUtils.testPreUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes());
 
     Set<PipelineID> preUpgradeOpenPipelines =
         scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
@@ -458,11 +292,8 @@ public class TestHDDSUpgrade {
     Assert.assertEquals(STARTING_FINALIZATION, status.status());
 
     // Wait for the Finalization to complete on the SCM.
-    while (status.status() != FINALIZATION_DONE) {
-      status = scm.getFinalizationManager()
-          .queryUpgradeFinalizationProgress("xyz",
-          false, false);
-    }
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(
+        cluster.getStorageContainerLocationClient(), "xyz");
 
     Set<PipelineID> postUpgradeOpenPipelines =
         scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
@@ -479,15 +310,20 @@ public class TestHDDSUpgrade {
     Assert.assertEquals(0, numPreUpgradeOpenPipelines);
 
     // Verify Post-Upgrade conditions on the SCM.
-    testPostUpgradeConditionsSCM();
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList(),
+        numContainersCreated, NUM_DATA_NODES);
 
     // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
-    testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+    TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+        cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+        HEALTHY_READONLY, HEALTHY);
 
     // Verify the SCM has driven all the DataNodes through Layout Upgrade.
     // In the happy path case, no containers should have been quasi closed as
     // a result of the upgrade.
-    testPostUpgradeConditionsDataNodes(CLOSED);
+    TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes(), numContainersCreated, CLOSED);
 
     // Test that we can use a pipeline after upgrade.
     // Will fail with exception if there are no pipelines.
@@ -568,7 +404,7 @@ public class TestHDDSUpgrade {
       }
       cluster.waitForClusterToBeReady();
     } catch (Exception e) {
-      LOG.info("DataNode Restarts Failed!");
+      LOG.error("DataNode Restarts Failed!", e);
       testPassed.set(false);
     }
     loadSCMState();
@@ -997,8 +833,10 @@ public class TestHDDSUpgrade {
     createKey();
 
     // Test the Pre-Upgrade conditions on SCM as well as DataNodes.
-    testPreUpgradeConditionsSCM();
-    testPreUpgradeConditionsDataNodes();
+    TestHddsUpgradeUtils.testPreUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList());
+    TestHddsUpgradeUtils.testPreUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes());
 
     // Trigger Finalization on the SCM
     StatusAndMessages status =
@@ -1027,18 +865,24 @@ public class TestHDDSUpgrade {
 
     // Verify Post-Upgrade conditions on the SCM.
     // With failure injection
-    testPostUpgradeConditionsSCM();
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList(), numContainersCreated,
+        NUM_DATA_NODES);
 
     // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
     // Due to timing constraint also allow a "HEALTHY" state.
     loadSCMState();
-    testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+    TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+        cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+        HEALTHY_READONLY, HEALTHY);
 
     // Need to wait for post finalization heartbeat from DNs.
     LambdaTestUtils.await(600000, 500, () -> {
       try {
         loadSCMState();
-        testDataNodesStateOnSCM(HEALTHY, null);
+        TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+            cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+            HEALTHY, null);
         sleep(100);
       } catch (Throwable ex) {
         LOG.info(ex.getMessage());
@@ -1048,7 +892,8 @@ public class TestHDDSUpgrade {
     });
 
     // Verify the SCM has driven all the DataNodes through Layout Upgrade.
-    testPostUpgradeConditionsDataNodes();
+    TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes(), numContainersCreated);
 
     // Verify that new pipeline can be created with upgraded datanodes.
     try {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
new file mode 100644
index 0000000000..ccedede134
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.upgrade;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+
+/**
+ * Helper methods for testing HDDS upgrade finalization in integration tests.
+ */
+public final class TestHddsUpgradeUtils {
+
+  private TestHddsUpgradeUtils() { }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestHddsUpgradeUtils.class);
+
+  private static final ReplicationConfig RATIS_THREE =
+      
ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE);
+
+  public static void waitForFinalizationFromClient(
+      StorageContainerLocationProtocol scmClient, String clientID)
+      throws Exception {
+    LambdaTestUtils.await(60_000, 1_000, () -> {
+      UpgradeFinalizer.Status status = scmClient
+          .queryUpgradeFinalizationProgress(clientID, true, true)
+          .status();
+      LOG.info("Waiting for upgrade finalization to complete from client." +
+              " Current status is {}.", status);
+      return status == FINALIZATION_DONE || status == ALREADY_FINALIZED;
+    });
+  }
+
+  /*
+   * Helper function to test Pre-Upgrade conditions on the SCM
+   */
+  public static void testPreUpgradeConditionsSCM(
+      List<StorageContainerManager> scms) {
+    for (StorageContainerManager scm : scms) {
+      Assert.assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
+          scm.getLayoutVersionManager().getMetadataLayoutVersion());
+      for (ContainerInfo ci : scm.getContainerManager()
+          .getContainers()) {
+        Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, ci.getState());
+      }
+    }
+  }
+
+  /*
+   * Helper function to test Post-Upgrade conditions on the SCM
+   */
+  public static void testPostUpgradeConditionsSCM(
+      List<StorageContainerManager> scms, int numContainers, int numDatanodes) 
{
+    for (StorageContainerManager scm : scms) {
+      LOG.info("Testing post upgrade conditions on SCM with node ID: {}",
+          scm.getSCMNodeId());
+      testPostUpgradeConditionsSCM(scm, numContainers, numDatanodes);
+    }
+  }
+
+  public static void testPostUpgradeConditionsSCM(StorageContainerManager scm,
+      int numContainers, int numDatanodes) {
+
+    Assert.assertTrue(scm.getScmContext().getFinalizationCheckpoint()
+        .hasCrossed(FinalizationCheckpoint.FINALIZATION_COMPLETE));
+
+    HDDSLayoutVersionManager scmVersionManager = scm.getLayoutVersionManager();
+    Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
+        scmVersionManager.getMetadataLayoutVersion());
+    Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
+
+    // SCM should not return from finalization until there is at least one
+    // pipeline to use.
+    PipelineManager scmPipelineManager = scm.getPipelineManager();
+    try {
+      GenericTestUtils.waitFor(() -> {
+        int pipelineCount = scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
+            .size();
+        if (pipelineCount >= 1) {
+          return true;
+        }
+        return false;
+      }, 500, 60000);
+    } catch (TimeoutException | InterruptedException e) {
+      Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
+    }
+
+    // SCM will not return from finalization until there is at least one
+    // RATIS 3 pipeline. For this to exist, all three of our datanodes must
+    // be in the HEALTHY state.
+    testDataNodesStateOnSCM(scm, numDatanodes, HEALTHY, HEALTHY_READONLY);
+
+    int countContainers = 0;
+    for (ContainerInfo ci : scm.getContainerManager().getContainers()) {
+      HddsProtos.LifeCycleState ciState = ci.getState();
+      LOG.info("testPostUpgradeConditionsSCM: container state is {}",
+          ciState.name());
+      Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
+          (ciState == HddsProtos.LifeCycleState.CLOSING) ||
+          (ciState == HddsProtos.LifeCycleState.DELETING) ||
+          (ciState == HddsProtos.LifeCycleState.DELETED) ||
+          (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
+      countContainers++;
+    }
+    Assert.assertTrue(countContainers >= numContainers);
+  }
+
+  /*
+   * Helper function to test Pre-Upgrade conditions on all the DataNodes.
+   */
+  public static void testPreUpgradeConditionsDataNodes(
+        List<HddsDatanodeService> datanodes) {
+    for (HddsDatanodeService dataNode : datanodes) {
+      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+      HDDSLayoutVersionManager dnVersionManager =
+          dsm.getLayoutVersionManager();
+      Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
+    }
+
+    int countContainers = 0;
+    for (HddsDatanodeService dataNode : datanodes) {
+      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+      // Also verify that all the existing containers are open.
+      for (Iterator<Container<?>> it =
+           dsm.getContainer().getController().getContainers(); it.hasNext();) {
+        Container container = it.next();
+        Assert.assertSame(container.getContainerState(),
+            ContainerProtos.ContainerDataProto.State.OPEN);
+        countContainers++;
+      }
+    }
+    Assert.assertTrue(countContainers >= 1);
+  }
+
+  /*
+   * Helper function to test Post-Upgrade conditions on all the DataNodes.
+   */
+  public static void testPostUpgradeConditionsDataNodes(
+      List<HddsDatanodeService> datanodes, int numContainers,
+      ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
+    List<ContainerProtos.ContainerDataProto.State> closeStates =
+        Arrays.asList(validClosedContainerStates);
+    // Allow closed and quasi closed containers as valid closed containers by
+    // default.
+    if (closeStates.isEmpty()) {
+      closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
+    }
+
+    try {
+      GenericTestUtils.waitFor(() -> {
+        for (HddsDatanodeService dataNode : datanodes) {
+          DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+          try {
+            if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
+                (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
+              return false;
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to query datanode upgrade status.", e);
+            return false;
+          }
+        }
+        return true;
+      }, 500, 60000);
+    } catch (TimeoutException | InterruptedException e) {
+      Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
+    }
+
+    int countContainers = 0;
+    for (HddsDatanodeService dataNode : datanodes) {
+      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+      HDDSLayoutVersionManager dnVersionManager =
+          dsm.getLayoutVersionManager();
+      Assert.assertEquals(dnVersionManager.getSoftwareLayoutVersion(),
+          dnVersionManager.getMetadataLayoutVersion());
+      Assert.assertTrue(dnVersionManager.getMetadataLayoutVersion() >= 1);
+
+      // Also verify that all the existing containers are closed.
+      for (Iterator<Container<?>> it =
+           dsm.getContainer().getController().getContainers(); it.hasNext();) {
+        Container<?> container = it.next();
+        Assert.assertTrue("Container had unexpected state " +
+                container.getContainerState(),
+            closeStates.stream().anyMatch(
+                state -> container.getContainerState().equals(state)));
+        countContainers++;
+      }
+    }
+    Assert.assertTrue(countContainers >= numContainers);
+  }
+
+  public static void testDataNodesStateOnSCM(List<StorageContainerManager> 
scms,
+      int expectedDatanodeCount, HddsProtos.NodeState state,
+      HddsProtos.NodeState alternateState) {
+    scms.forEach(scm -> testDataNodesStateOnSCM(scm, expectedDatanodeCount,
+        state, alternateState));
+  }
+
+  /*
+   * Helper function to test DataNode state on the SCM. Note that due to
+   * timing constraints, sometime the node-state can transition to the next
+   * state. This function expects the DataNode to be in NodeState "state" or
+   * "alternateState". Some tests can enforce a unique NodeState test by
+   * setting "alternateState = null".
+   */
+  public static void testDataNodesStateOnSCM(StorageContainerManager scm,
+      int expectedDatanodeCount, HddsProtos.NodeState state,
+      HddsProtos.NodeState alternateState) {
+    int countNodes = 0;
+    for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
+      try {
+        HddsProtos.NodeState dnState =
+            scm.getScmNodeManager().getNodeStatus(dn).getHealth();
+        Assert.assertTrue((dnState == state) ||
+            (alternateState != null && dnState == alternateState));
+      } catch (NodeNotFoundException e) {
+        e.printStackTrace();
+        Assert.fail("Node not found");
+      }
+      ++countNodes;
+    }
+    Assert.assertEquals(expectedDatanodeCount, countNodes);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
new file mode 100644
index 0000000000..d6696b3a70
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.upgrade;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationStateManagerImpl;
+import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor;
+import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
+import org.apache.hadoop.ozone.upgrade.UpgradeTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+
+/**
+ * Tests upgrade finalization failure scenarios and corner cases specific to 
SCM
+ * HA.
+ */
+public class TestScmHAFinalization {
+  private static final String CLIENT_ID = UUID.randomUUID().toString();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestScmHAFinalization.class);
+  private static final String METHOD_SOURCE =
+      "org.apache.hadoop.hdds.upgrade" +
+          ".TestScmHAFinalization#injectionPointsToTest";
+
+  private StorageContainerLocationProtocol scmClient;
+  private MiniOzoneHAClusterImpl cluster;
+  private static final int NUM_DATANODES = 3;
+  private static final int NUM_SCMS = 3;
+  private Future<?> finalizationFuture;
+
+  public void init(OzoneConfiguration conf,
+      UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext> executor,
+      int numInactiveSCMs) throws Exception {
+
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setUpgradeFinalizationExecutor(executor);
+
+    MiniOzoneCluster.Builder clusterBuilder =
+        new MiniOzoneHAClusterImpl.Builder(conf)
+        .setNumOfStorageContainerManagers(NUM_SCMS)
+        .setNumOfActiveSCMs(NUM_SCMS - numInactiveSCMs)
+        .setScmLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion())
+        .setSCMServiceId("scmservice")
+        .setSCMConfigurator(configurator)
+        .setNumOfOzoneManagers(1)
+        .setNumDatanodes(NUM_DATANODES)
+        .setDnLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    this.cluster = (MiniOzoneHAClusterImpl) clusterBuilder.build();
+
+    scmClient = cluster.getStorageContainerLocationClient();
+    cluster.waitForClusterToBeReady();
+
+    // Launch finalization from the client. In the current implementation,
+    // this call will block until finalization completes. If the test
+    // involves restarts or leader changes the client may be disconnected,
+    // but finalization should still proceed.
+    finalizationFuture = Executors.newSingleThreadExecutor().submit(
+        () -> {
+          try {
+            scmClient.finalizeScmUpgrade(CLIENT_ID);
+          } catch (IOException ex) {
+            LOG.info("finalization client failed. This may be expected if the" 
+
+                " test injected failures.", ex);
+          }
+        });
+  }
+
+  @AfterEach
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Argument supplier for parameterized tests.
+   */
+  public static Stream<Arguments> injectionPointsToTest() {
+    // Do not test from BEFORE_PRE_FINALIZE_UPGRADE injection point.
+    // Finalization will not have started so there will be no persisted state
+    // to resume from.
+    return Stream.of(
+        Arguments.of(UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE),
+        Arguments.of(UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION),
+        Arguments.of(UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource(METHOD_SOURCE)
+  public void testFinalizationWithLeaderChange(
+      UpgradeTestInjectionPoints haltingPoint) throws Exception {
+
+    CountDownLatch pauseLatch = new CountDownLatch(1);
+    CountDownLatch unpauseLatch = new CountDownLatch(1);
+    init(new OzoneConfiguration(),
+        UpgradeTestUtils.newPausingFinalizationExecutor(haltingPoint,
+            pauseLatch, unpauseLatch, LOG), 0);
+    pauseLatch.await();
+
+    // Stop the leader, forcing a leader change in the middle of finalization.
+    // This will cause the initial client call for finalization
+    // to be interrupted.
+    StorageContainerManager oldLeaderScm = cluster.getActiveSCM();
+    LOG.info("Stopping current SCM leader {} to initiate a leader change.",
+        oldLeaderScm.getSCMNodeId());
+    cluster.shutdownStorageContainerManager(oldLeaderScm);
+
+    // While finalization is paused, check its state on the remaining SCMs.
+    checkMidFinalizationConditions(haltingPoint,
+        cluster.getStorageContainerManagersList());
+
+    // Wait for the remaining two SCMs to elect a new leader.
+    cluster.waitForClusterToBeReady();
+
+    // Restart actually creates a new SCM.
+    // Since this SCM will be a follower, the implementation of its upgrade
+    // finalization executor does not matter for this test.
+    cluster.restartStorageContainerManager(oldLeaderScm, true);
+
+    // Make sure the original SCM leader is not the leader anymore.
+    StorageContainerManager newLeaderScm  = cluster.getActiveSCM();
+    Assertions.assertNotEquals(newLeaderScm.getSCMNodeId(),
+        oldLeaderScm.getSCMNodeId());
+
+    // Resume finalization from the new leader.
+    unpauseLatch.countDown();
+
+    // Client should complete exceptionally since the original SCM it
+    // requested to was restarted.
+    finalizationFuture.get();
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+    // Make sure old leader has caught up and all SCMs have finalized.
+    waitForScmsToFinalize(cluster.getStorageContainerManagersList());
+
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList(), 0, NUM_DATANODES);
+    TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes(), 0, CLOSED);
+  }
+
+  @ParameterizedTest
+  @MethodSource(METHOD_SOURCE)
+  public void testFinalizationWithRestart(
+      UpgradeTestInjectionPoints haltingPoint) throws Exception {
+    CountDownLatch terminateLatch = new CountDownLatch(1);
+    init(new OzoneConfiguration(),
+        UpgradeTestUtils.newTerminatingFinalizationExecutor(haltingPoint,
+            terminateLatch, LOG),
+        0);
+    terminateLatch.await();
+
+    // Once upgrade finalization is stopped at the halting point, restart all
+    // SCMs.
+    LOG.info("Restarting all SCMs during upgrade finalization.");
+    // Restarting an SCM from mini ozone actually replaces the SCM with a new
+    // instance. We will use the normal upgrade finalization executor for
+    // these new instances, since the last one aborted at the halting point.
+    cluster.getSCMConfigurator()
+        .setUpgradeFinalizationExecutor(
+            new DefaultUpgradeFinalizationExecutor<>());
+    List<StorageContainerManager> originalSCMs =
+        cluster.getStorageContainerManagers();
+
+    for (StorageContainerManager scm: originalSCMs) {
+      cluster.restartStorageContainerManager(scm, false);
+    }
+
+    checkMidFinalizationConditions(haltingPoint,
+        cluster.getStorageContainerManagersList());
+
+    // After all SCMs were restarted, finalization should resume
+    // automatically once a leader is elected.
+    cluster.waitForClusterToBeReady();
+
+    finalizationFuture.get();
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+    // Once the leader tells the client finalization is complete, wait for all
+    // followers to catch up so we can check their state.
+    waitForScmsToFinalize(cluster.getStorageContainerManagersList());
+
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        cluster.getStorageContainerManagersList(), 0, NUM_DATANODES);
+    TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes(), 0, CLOSED);
+  }
+
+  @Test
+  public void testSnapshotFinalization() throws Exception {
+    int numInactiveSCMs = 1;
+    // Require snapshot installation after only a few transactions.
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_ENABLED, true);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP, 5);
+    conf.setLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD,
+        5);
+
+    init(conf, new DefaultUpgradeFinalizationExecutor<>(), numInactiveSCMs);
+
+    GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
+        .captureLogs(FinalizationStateManagerImpl.LOG);
+
+    StorageContainerManager inactiveScm = cluster.getInactiveSCM().next();
+    LOG.info("Inactive SCM node ID: {}", inactiveScm.getSCMNodeId());
+
+    List<StorageContainerManager> scms =
+        cluster.getStorageContainerManagersList();
+    List<StorageContainerManager> activeScms = new ArrayList<>();
+    for (StorageContainerManager scm : scms) {
+      if (!scm.getSCMNodeId().equals(inactiveScm.getSCMNodeId())) {
+        activeScms.add(scm);
+      }
+    }
+
+    // Wait for finalization from the client perspective.
+    finalizationFuture.get();
+    TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+    // Wait for two running SCMs to finish finalization.
+    waitForScmsToFinalize(activeScms);
+
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        activeScms, 0, NUM_DATANODES);
+    TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+        cluster.getHddsDatanodes(), 0, CLOSED);
+
+    // Move SCM log index farther ahead to make sure a snapshot install
+    // happens on the restarted SCM.
+    for (int i = 0; i < 10; i++) {
+      ContainerWithPipeline container =
+          scmClient.allocateContainer(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.ONE, "owner");
+      scmClient.closeContainer(
+          container.getContainerInfo().getContainerID());
+    }
+
+    cluster.startInactiveSCM(inactiveScm.getSCMNodeId());
+    waitForScmToFinalize(inactiveScm);
+
+    TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+        inactiveScm, 0, NUM_DATANODES);
+
+    // Use log to verify a snapshot was installed.
+    Assertions.assertTrue(logCapture.getOutput().contains("New SCM snapshot " +
+        "received with metadata layout version"));
+  }
+
+  private void waitForScmsToFinalize(Collection<StorageContainerManager> scms)
+      throws Exception {
+    for (StorageContainerManager scm: scms) {
+      waitForScmToFinalize(scm);
+    }
+  }
+
+  private void waitForScmToFinalize(StorageContainerManager scm)
+      throws Exception {
+    GenericTestUtils.waitFor(() -> !scm.isInSafeMode(), 500, 5000);
+    GenericTestUtils.waitFor(() -> {
+      FinalizationCheckpoint checkpoint =
+          scm.getScmContext().getFinalizationCheckpoint();
+      LOG.info("Waiting for SCM {} (leader? {}) to finalize. Current " +
+          "finalization checkpoint is {}",
+          scm.getSCMNodeId(), scm.checkLeader(), checkpoint);
+      return checkpoint.hasCrossed(
+          FinalizationCheckpoint.FINALIZATION_COMPLETE);
+    }, 2_000, 60_000);
+  }
+
+  private void checkMidFinalizationConditions(
+      UpgradeTestInjectionPoints haltingPoint,
+      List<StorageContainerManager> scms) {
+    for (StorageContainerManager scm: scms) {
+      switch (haltingPoint) {
+      case BEFORE_PRE_FINALIZE_UPGRADE:
+        Assertions.assertFalse(
+            scm.getPipelineManager().isPipelineCreationFrozen());
+        Assertions.assertEquals(
+            scm.getScmContext().getFinalizationCheckpoint(),
+            FinalizationCheckpoint.FINALIZATION_REQUIRED);
+        break;
+      case AFTER_PRE_FINALIZE_UPGRADE:
+        Assertions.assertTrue(
+            scm.getPipelineManager().isPipelineCreationFrozen());
+        Assertions.assertEquals(
+            scm.getScmContext().getFinalizationCheckpoint(),
+            FinalizationCheckpoint.FINALIZATION_STARTED);
+        break;
+      case AFTER_COMPLETE_FINALIZATION:
+        Assertions.assertFalse(
+            scm.getPipelineManager().isPipelineCreationFrozen());
+        Assertions.assertEquals(
+            scm.getScmContext().getFinalizationCheckpoint(),
+            FinalizationCheckpoint.MLV_EQUALS_SLV);
+        break;
+      case AFTER_POST_FINALIZE_UPGRADE:
+        Assertions.assertFalse(
+            scm.getPipelineManager().isPipelineCreationFrozen());
+        Assertions.assertEquals(
+            scm.getScmContext().getFinalizationCheckpoint(),
+            FinalizationCheckpoint.FINALIZATION_COMPLETE);
+        break;
+      default:
+        Assertions.fail("Unknown halting point in test: " + haltingPoint);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 8e8de6df6a..1bdaef3a9b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -115,6 +115,7 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
       LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
 
   private OzoneConfiguration conf;
+  private final SCMConfigurator scmConfigurator;
   private StorageContainerManager scm;
   private OzoneManager ozoneManager;
   private final List<HddsDatanodeService> hddsDatanodes;
@@ -124,27 +125,13 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
   private int waitForClusterToBeReadyTimeout = 120000; // 2 min
   private CertificateClient caClient;
 
-  /**
-   * Creates a new MiniOzoneCluster.
-   *
-   * @throws IOException if there is an I/O error
-   */
-  protected MiniOzoneClusterImpl(OzoneConfiguration conf,
-                                 OzoneManager ozoneManager,
-                                 StorageContainerManager scm,
-                                 List<HddsDatanodeService> hddsDatanodes) {
-    this.conf = conf;
-    this.ozoneManager = ozoneManager;
-    this.scm = scm;
-    this.hddsDatanodes = hddsDatanodes;
-  }
-
   /**
    * Creates a new MiniOzoneCluster with Recon.
    *
    * @throws IOException if there is an I/O error
    */
   MiniOzoneClusterImpl(OzoneConfiguration conf,
+                       SCMConfigurator scmConfigurator,
                        OzoneManager ozoneManager,
                        StorageContainerManager scm,
                        List<HddsDatanodeService> hddsDatanodes,
@@ -154,6 +141,7 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
     this.scm = scm;
     this.hddsDatanodes = hddsDatanodes;
     this.reconServer = reconServer;
+    this.scmConfigurator = scmConfigurator;
   }
 
   /**
@@ -165,13 +153,18 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
    * @param conf
    * @param hddsDatanodes
    */
-  MiniOzoneClusterImpl(OzoneConfiguration conf,
+  MiniOzoneClusterImpl(OzoneConfiguration conf, SCMConfigurator 
scmConfigurator,
       List<HddsDatanodeService> hddsDatanodes, ReconServer reconServer) {
+    this.scmConfigurator = scmConfigurator;
     this.conf = conf;
     this.hddsDatanodes = hddsDatanodes;
     this.reconServer = reconServer;
   }
 
+  public SCMConfigurator getSCMConfigurator() {
+    return scmConfigurator;
+  }
+
   @Override
   public OzoneConfiguration getConf() {
     return conf;
@@ -342,11 +335,6 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
   @Override
   public StorageContainerLocationProtocolClientSideTranslatorPB
       getStorageContainerLocationClient() throws IOException {
-    InetSocketAddress address = scm.getClientRpcAddress();
-    LOG.info(
-        "Creating StorageContainerLocationProtocol RPC client with address {}",
-        address);
-
     SCMContainerLocationFailoverProxyProvider proxyProvider =
         new SCMContainerLocationFailoverProxyProvider(conf, null);
 
@@ -361,7 +349,7 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
     LOG.info("Restarting SCM in cluster " + this.getClass());
     scm.stop();
     scm.join();
-    scm = HddsTestUtils.getScmSimple(conf);
+    scm = HddsTestUtils.getScmSimple(conf, scmConfigurator);
     scm.start();
     if (waitForDatanode) {
       waitForClusterToBeReady();
@@ -612,7 +600,8 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
         hddsDatanodes = createHddsDatanodes(
             Collections.singletonList(scm), reconServer);
 
-        MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
+        MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf,
+            scmConfigurator, om, scm,
             hddsDatanodes, reconServer);
 
         cluster.setCAClient(certClient);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 158cf1086f..60cb1a16c9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
 import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -94,12 +95,13 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
    */
   public MiniOzoneHAClusterImpl(
       OzoneConfiguration conf,
+      SCMConfigurator scmConfigurator,
       OMHAService omhaService,
       SCMHAService scmhaService,
       List<HddsDatanodeService> hddsDatanodes,
       String clusterPath,
       ReconServer reconServer) {
-    super(conf, hddsDatanodes, reconServer);
+    super(conf, scmConfigurator, hddsDatanodes, reconServer);
     this.omhaService = omhaService;
     this.scmhaService = scmhaService;
     this.clusterMetaPath = clusterPath;
@@ -249,23 +251,25 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     LOG.info("Shutting down StorageContainerManager " + scm.getScmId());
 
     scm.stop();
-    scmhaService.deactivate(scm);
+    scmhaService.removeInstance(scm);
   }
 
-  public void restartStorageContainerManager(
+  public StorageContainerManager restartStorageContainerManager(
       StorageContainerManager scm, boolean waitForSCM)
       throws IOException, TimeoutException,
       InterruptedException, AuthenticationException {
     LOG.info("Restarting SCM in cluster " + this.getClass());
+    scmhaService.removeInstance(scm);
     OzoneConfiguration scmConf = scm.getConfiguration();
     shutdownStorageContainerManager(scm);
     scm.join();
-    scm = HddsTestUtils.getScmSimple(scmConf);
-    scmhaService.activate(scm);
+    scm = HddsTestUtils.getScmSimple(scmConf, getSCMConfigurator());
+    scmhaService.addInstance(scm, true);
     scm.start();
     if (waitForSCM) {
       waitForClusterToBeReady();
     }
+    return scm;
   }
 
   public String getClusterId() throws IOException {
@@ -433,7 +437,8 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
           scmService.getActiveServices(), reconServer);
 
       MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf,
-          omService, scmService, hddsDatanodes, path, reconServer);
+          scmConfigurator, omService, scmService, hddsDatanodes, path,
+          reconServer);
 
       if (startDataNodes) {
         cluster.startHddsDatanodes();
@@ -583,7 +588,8 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
             } else {
               StorageContainerManager.scmBootstrap(scmConfig);
             }
-            StorageContainerManager scm = 
HddsTestUtils.getScmSimple(scmConfig);
+            StorageContainerManager scm =
+                HddsTestUtils.getScmSimple(scmConfig, scmConfigurator);
             HealthyPipelineSafeModeRule rule =
                 scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule();
             if (rule != null) {
@@ -977,7 +983,11 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     }
 
     public boolean removeInstance(Type t) {
-      return services.remove(t);
+      boolean result =  services.remove(t);
+      serviceMap.remove(serviceIdProvider.apply(t));
+      activeServices.remove(t);
+      inactiveServices.remove(t);
+      return result;
     }
 
     public void addInstance(Type t, boolean isActive) {
@@ -1053,7 +1063,7 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
   }
 
   public List<StorageContainerManager> getStorageContainerManagers() {
-    return this.scmhaService.getServices();
+    return new ArrayList<>(this.scmhaService.getServices());
   }
 
   public StorageContainerManager getStorageContainerManager() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to