This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-nonrolling-upgrade 
by this push:
     new 7266f32  HDDS-4914. Failure injection and validating HDDS upgrade. 
(#1998)
7266f32 is described below

commit 7266f322c56a7b731996d541a856d468d5c42b2a
Author: prashantpogde <[email protected]>
AuthorDate: Wed Apr 14 07:48:09 2021 -0700

    HDDS-4914. Failure injection and validating HDDS upgrade. (#1998)
---
 .../ozone/upgrade/BasicUpgradeFinalizer.java       |  34 +-
 .../DefaultUpgradeFinalizationExecutor.java        |  78 ++
 .../hadoop/ozone/upgrade/UpgradeFinalizer.java     |   8 +-
 .../InjectedUpgradeFinalizationExecutor.java       | 133 ++++
 .../hadoop/ozone/upgrade/TestUpgradeFinalizer.java |  16 +
 .../common/statemachine/DatanodeStateMachine.java  |  13 +-
 .../upgrade/DataNodeUpgradeFinalizer.java          |  82 +--
 .../scm/pipeline/BackgroundPipelineCreator.java    |   1 +
 .../hdds/scm/server/StorageContainerManager.java   |   6 +-
 .../scm/server/upgrade/SCMUpgradeFinalizer.java    |  84 +--
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       | 805 ++++++++++++++++++++-
 .../ozone/om/upgrade/OMUpgradeFinalizer.java       |  83 +--
 12 files changed, 1160 insertions(+), 183 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
index 04ffead..1c0e3d3 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -57,18 +57,44 @@ public abstract class BasicUpgradeFinalizer
   protected V versionManager;
   protected String clientID;
   protected T component;
+  protected DefaultUpgradeFinalizationExecutor finalizationExecutor;
 
   private Queue<String> msgs = new ConcurrentLinkedQueue<>();
   protected boolean isDone = false;
 
   public BasicUpgradeFinalizer(V versionManager) {
     this.versionManager = versionManager;
+    this.finalizationExecutor =
+        new DefaultUpgradeFinalizationExecutor();
+  }
+
+  /**
+   * Sets the Finalization Executor driver.
+   * @param executor FinalizationExecutor.
+   */
+
+  public void setFinalizationExecutor(DefaultUpgradeFinalizationExecutor
+                                          executor) {
+    finalizationExecutor = executor;
+  }
+
+  @Override
+  public DefaultUpgradeFinalizationExecutor getFinalizationExecutor() {
+    return finalizationExecutor;
   }
 
   public boolean isFinalizationDone() {
     return isDone;
   }
 
+  public void markFinalizationDone() {
+    isDone = true;
+  }
+
+  public V getVersionManager() {
+    return versionManager;
+  }
+
   public synchronized StatusAndMessages preFinalize(String upgradeClientID,
                                                     T id)
       throws UpgradeException {
@@ -182,7 +208,6 @@ public abstract class BasicUpgradeFinalizer
         || status.equals(FINALIZATION_DONE);
   }
 
-
   protected void finalizeFeature(LayoutFeature feature, Storage config,
                                  Optional<? extends UpgradeAction> action)
       throws UpgradeException {
@@ -436,4 +461,11 @@ public abstract class BasicUpgradeFinalizer
   protected void updateLayoutVersionInDB(V vm, T comp) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  protected abstract void postFinalizeUpgrade() throws IOException;
+
+  protected abstract void finalizeUpgrade(Storage storageConfig)
+      throws UpgradeException;
+
+  protected abstract boolean preFinalizeUpgrade() throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
new file mode 100644
index 0000000..14c1f2d
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultUpgradeFinalizationExecutor for driving the main part of 
finalization.
+ * Unit/Integration tests can override this to provide error injected version
+ * of this class.
+ */
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class DefaultUpgradeFinalizationExecutor {
+  static final Logger LOG =
+      LoggerFactory.getLogger(DefaultUpgradeFinalizationExecutor.class);
+
+  public DefaultUpgradeFinalizationExecutor() {
+  }
+
+  public Void execute(Storage storageConfig,
+                      BasicUpgradeFinalizer basicUpgradeFinalizer)
+      throws Exception {
+    try {
+      basicUpgradeFinalizer.emitStartingMsg();
+      basicUpgradeFinalizer.getVersionManager()
+          .setUpgradeState(FINALIZATION_IN_PROGRESS);
+
+      /*
+       * Before we can call finalize the feature, we need to make sure that
+       * all existing pipelines are closed and pipeline Manger would freeze
+       * all new pipeline creation.
+       */
+      if(!basicUpgradeFinalizer.preFinalizeUpgrade()) {
+        return null;
+      }
+
+      basicUpgradeFinalizer.finalizeUpgrade(storageConfig);
+
+      basicUpgradeFinalizer.postFinalizeUpgrade();
+
+      basicUpgradeFinalizer.emitFinishedMsg();
+      return null;
+    } catch (Exception e) {
+      LOG.warn("Upgrade Finalization failed with following Exception:");
+      e.printStackTrace();
+      if (basicUpgradeFinalizer.getVersionManager().needsFinalization()) {
+        basicUpgradeFinalizer.getVersionManager()
+            .setUpgradeState(FINALIZATION_REQUIRED);
+        throw (e);
+      }
+    } finally {
+      basicUpgradeFinalizer.markFinalizationDone();
+    }
+    return null;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
index ffa6e06..59387b3 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
@@ -64,7 +64,7 @@ public interface UpgradeFinalizer<T> {
     STARTING_FINALIZATION,
     FINALIZATION_IN_PROGRESS,
     FINALIZATION_DONE,
-    FINALIZATION_REQUIRED
+    FINALIZATION_REQUIRED,
   }
 
   /**
@@ -72,7 +72,6 @@ public interface UpgradeFinalizer<T> {
    * ongoing, the messages that should be passed to the initiating client of
    * finalization.
    * This translates to a counterpart in the RPC layer.
-   * @see UpgradeFinalizationStatus in OMClientProtocol.proto
    */
   class StatusAndMessages {
     private Status status;
@@ -187,4 +186,9 @@ public interface UpgradeFinalizer<T> {
    */
   void runPrefinalizeStateActions(Storage storage, T service)
       throws IOException;
+
+  /**
+   * get the Finalization Executor driver.
+   */
+  DefaultUpgradeFinalizationExecutor getFinalizationExecutor();
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
new file mode 100644
index 0000000..6b512b6
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Failure injected extension of DefaultUpgradeFinalizationExecutor that
+ * can be used by Unit/Integration Tests.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class InjectedUpgradeFinalizationExecutor extends
+    DefaultUpgradeFinalizationExecutor {
+  static final Logger LOG =
+      LoggerFactory.getLogger(InjectedUpgradeFinalizationExecutor.class);
+
+  private Callable<Boolean> injectTestFunction;
+  private UpgradeTestInjectionPoints testInjectionPoint;
+
+  public enum UpgradeTestInjectionPoints {
+    BEFORE_PRE_FINALIZE_UPGRADE(1),
+    AFTER_PRE_FINALIZE_UPGRADE(2),
+    AFTER_COMPLETE_FINALIZATION(4),
+    AFTER_POST_FINALIZE_UPGRADE(5);
+
+    private int val;
+    UpgradeTestInjectionPoints(int value) {
+      val = value;
+    }
+
+    public int getValue() {
+      return val;
+    }
+  }
+
+  static class UpgradeTestInjectionAbort extends Exception {
+    UpgradeTestInjectionAbort() {
+    }
+  }
+
+  @Override
+  public Void execute(Storage storageConfig,
+                      BasicUpgradeFinalizer basicUpgradeFinalizer)
+      throws Exception {
+    try {
+      injectTestFunctionAtThisPoint(BEFORE_PRE_FINALIZE_UPGRADE);
+      basicUpgradeFinalizer.emitStartingMsg();
+      basicUpgradeFinalizer.getVersionManager()
+          .setUpgradeState(FINALIZATION_IN_PROGRESS);
+
+      if(!basicUpgradeFinalizer.preFinalizeUpgrade()) {
+        return null;
+      }
+      injectTestFunctionAtThisPoint(AFTER_PRE_FINALIZE_UPGRADE);
+
+      basicUpgradeFinalizer.finalizeUpgrade(storageConfig);
+
+      injectTestFunctionAtThisPoint(AFTER_COMPLETE_FINALIZATION);
+
+      basicUpgradeFinalizer.postFinalizeUpgrade();
+      injectTestFunctionAtThisPoint(AFTER_POST_FINALIZE_UPGRADE);
+
+      basicUpgradeFinalizer.emitFinishedMsg();
+      return null;
+    } catch (Exception e) {
+      LOG.warn("Upgrade Finalization failed with following Exception:");
+      e.printStackTrace();
+      if (basicUpgradeFinalizer.getVersionManager().needsFinalization()) {
+        basicUpgradeFinalizer.getVersionManager()
+            .setUpgradeState(FINALIZATION_REQUIRED);
+      }
+    } finally {
+      basicUpgradeFinalizer.markFinalizationDone();
+    }
+    return null;
+  }
+
+  /**
+   * Interface to inject arbitrary failures for stress testing.
+   * @param  injectedTestFunction that will be called
+   *        code execution reached injectTestFunctionAtThisPoint() location.
+   * @param pointIndex code execution point for a given thread.
+   */
+  public void configureTestInjectionFunction(
+      UpgradeTestInjectionPoints pointIndex,
+      Callable<Boolean> injectedTestFunction) {
+    injectTestFunction = injectedTestFunction;
+    testInjectionPoint = pointIndex;
+  }
+
+  /**
+   * Interface to inject error at a given point in an upgrade thread.
+   * @param pointIndex TestFunction Injection point in an upgrade thread.
+   * @return "true" if the calling thread should not continue with further
+   *          upgrade processing, "false" otherwise.
+   */
+  public void injectTestFunctionAtThisPoint(
+      UpgradeTestInjectionPoints pointIndex) throws Exception {
+    if ((testInjectionPoint != null) &&
+        (pointIndex.getValue() == testInjectionPoint.getValue()) &&
+        (injectTestFunction != null) && injectTestFunction.call()) {
+      throw new UpgradeTestInjectionAbort();
+    }
+    return;
+  }
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
index 1a0fefd..a6d6d57 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
@@ -130,6 +130,22 @@ public class TestUpgradeFinalizer {
     }
 
     @Override
+    protected void postFinalizeUpgrade() throws IOException {
+      return;
+    }
+
+    @Override
+    protected void finalizeUpgrade(Storage storageConfig)
+        throws UpgradeException {
+      return;
+    }
+
+    @Override
+    protected boolean preFinalizeUpgrade() throws IOException {
+      return false;
+    }
+
+    @Override
     public void runPrefinalizeStateActions(Storage storage,
                                            MockComponent mockComponent)
         throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 029899b..45cf48e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -60,6 +61,7 @@ import 
org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer;
 import org.apache.hadoop.ozone.container.upgrade.DatanodeMetadataFeatures;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.Time;
@@ -601,8 +603,7 @@ public class DatanodeStateMachine implements Closeable {
     return layoutStorage;
   }
 
-  @VisibleForTesting
-  public boolean canFinalizeDataNode() {
+  private boolean canFinalizeDataNode() {
     // Lets be sure that we do not have any open container before we return
     // from here. This function should be called in its own finalizer thread
     // context.
@@ -610,10 +611,13 @@ public class DatanodeStateMachine implements Closeable {
         getContainer().getController().getContainers();
     while (containerIt.hasNext()) {
       Container ctr = containerIt.next();
-      switch (ctr.getContainerState()) {
+      State state = ctr.getContainerState();
+      switch (state) {
       case OPEN:
       case CLOSING:
       case UNHEALTHY:
+        LOG.warn("FinalizeUpgrade : Waiting for container to close, current " +
+            "state is: {}", state);
         return false;
       default:
         continue;
@@ -642,4 +646,7 @@ public class DatanodeStateMachine implements Closeable {
     return upgradeFinalizer.reportStatus(datanodeDetails.getUuidString(),
         false);
   }
+  public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
+    return upgradeFinalizer;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index df4ebde..f4e7026 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -19,13 +19,10 @@
 package org.apache.hadoop.ozone.container.upgrade;
 
 import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -33,12 +30,18 @@ import org.apache.hadoop.ozone.common.Storage;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * UpgradeFinalizer for the DataNode.
  */
 public class DataNodeUpgradeFinalizer extends
     BasicUpgradeFinalizer<DatanodeStateMachine, HDDSLayoutVersionManager> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataNodeUpgradeFinalizer.class);
+  private DatanodeStateMachine datanodeStateMachine;
 
   public DataNodeUpgradeFinalizer(HDDSLayoutVersionManager versionManager,
                                   String optionalClientID) {
@@ -50,54 +53,51 @@ public class DataNodeUpgradeFinalizer extends
   public StatusAndMessages finalize(String upgradeClientID,
                                     DatanodeStateMachine dsm)
       throws IOException {
+    datanodeStateMachine = dsm;
     StatusAndMessages response = preFinalize(upgradeClientID, dsm);
     if (response.status() != FINALIZATION_REQUIRED) {
       return response;
     }
-    new Worker(dsm).call();
+    try {
+      getFinalizationExecutor().execute(dsm.getLayoutStorage(),
+          this);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException(e.getMessage());
+    }
     return STARTING_MSG;
   }
 
-  private class Worker implements Callable<Void> {
-    private DatanodeStateMachine datanodeStateMachine;
-
-    /**
-     * Initiates the Worker, for the specified DataNode instance.
-     * @param dsm the DataNodeStateMachine instance on which to finalize the
-     *           new LayoutFeatures.
-     */
-    Worker(DatanodeStateMachine dsm) {
-      datanodeStateMachine = dsm;
+  @Override
+  public boolean preFinalizeUpgrade() throws IOException {
+    if(!datanodeStateMachine.preFinalizeUpgrade()) {
+      // DataNode is not yet ready to finalize.
+      // Reset the Finalization state.
+      versionManager.setUpgradeState(FINALIZATION_REQUIRED);
+      String msg = "Pre Finalization checks failed on the DataNode.";
+      logAndEmit(msg);
+      return false;
     }
+    return true;
+  }
 
-    @Override
-    public Void call() throws IOException {
-      if(!datanodeStateMachine.preFinalizeUpgrade()) {
-      // datanode is not yet ready to finalize.
-      // Reset the Finalization state.
-        versionManager.setUpgradeState(FINALIZATION_REQUIRED);
-        return null;
-      }
-      try {
-        emitStartingMsg();
-        versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
-        for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
-          Optional<? extends UpgradeAction> action =
-              f.datanodeAction(ON_FINALIZE);
-          finalizeFeature(f, datanodeStateMachine.getLayoutStorage(), action);
-          updateLayoutVersionInVersionFile(f,
-              datanodeStateMachine.getLayoutStorage());
-          versionManager.finalized(f);
-        }
-        versionManager.completeFinalization();
-        datanodeStateMachine.postFinalizeUpgrade();
-        emitFinishedMsg();
-        return null;
-      } finally {
-        versionManager.setUpgradeState(FINALIZATION_DONE);
-        isDone = true;
-      }
+  @Override
+  public void postFinalizeUpgrade() throws  IOException {
+    datanodeStateMachine.postFinalizeUpgrade();
+  }
+
+  @Override
+  protected void finalizeUpgrade(Storage storageConfig)
+      throws UpgradeException {
+    for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+      Optional<? extends UpgradeAction> action =
+          f.datanodeAction(ON_FINALIZE);
+      finalizeFeature(f, datanodeStateMachine.getLayoutStorage(), action);
+      updateLayoutVersionInVersionFile(f,
+          datanodeStateMachine.getLayoutStorage());
+      versionManager.finalized(f);
     }
+    versionManager.completeFinalization();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 7700070..ffaaccd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -117,6 +117,7 @@ class BackgroundPipelineCreator {
     // TODO: #CLUTIL Different replication factor may need to be supported
 
     if(pausePipelineCreation.get()) {
+      LOG.info("Pipeline Creation is paused.");
       return;
     }
     HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index f6d4eb9..d88745b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1778,4 +1778,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   ) throws IOException {
     return upgradeFinalizer.reportStatus(upgradeClientID, takeover);
   }
-}
\ No newline at end of file
+
+  public UpgradeFinalizer<StorageContainerManager> getUpgradeFinalizer() {
+    return upgradeFinalizer;
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index 27dbb6b..1c8a3e4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -19,21 +19,18 @@
 package org.apache.hadoop.hdds.scm.server.upgrade;
 
 import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 
-import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
 
 /**
  * UpgradeFinalizer for the Storage Container Manager service.
@@ -41,6 +38,8 @@ import 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
 public class SCMUpgradeFinalizer extends
     BasicUpgradeFinalizer<StorageContainerManager, HDDSLayoutVersionManager> {
 
+  private StorageContainerManager storageContainerManager;
+
   public SCMUpgradeFinalizer(HDDSLayoutVersionManager versionManager) {
     super(versionManager);
   }
@@ -49,59 +48,52 @@ public class SCMUpgradeFinalizer extends
   public StatusAndMessages finalize(String upgradeClientID,
                                     StorageContainerManager scm)
       throws IOException {
+    storageContainerManager = scm;
     StatusAndMessages response = preFinalize(upgradeClientID, scm);
     if (response.status() != FINALIZATION_REQUIRED) {
       return response;
     }
-    new Worker(scm).call();
+    try {
+      getFinalizationExecutor().execute(scm.getScmStorageConfig(), this);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException(e.getMessage());
+    }
     return STARTING_MSG;
   }
 
-  private class Worker implements Callable<Void> {
-    private StorageContainerManager scm;
-
-    /**
-     * Initiates the Worker, for the specified SCM instance.
-     * @param scm the StorageContainerManager instance on which to finalize the
-     *           new LayoutFeatures.
+  @Override
+  public boolean preFinalizeUpgrade() throws IOException {
+    /*
+     * Before we can call finalize the feature, we need to make sure that
+     * all existing pipelines are closed and pipeline Manger would freeze
+     * all new pipeline creation.
      */
-    Worker(StorageContainerManager scm) {
-      this.scm = scm;
-    }
+    String msg = "  Existing pipelines and containers will be closed " +
+        "during Upgrade.";
+    msg += "\n  New pipelines creation will remain frozen until Upgrade " +
+        "is finalized.";
 
-    @Override
-    public Void call() throws IOException {
-      try {
-        emitStartingMsg();
-        versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
-        /*
-         * Before we can call finalize the feature, we need to make sure that
-         * all existing pipelines are closed and pipeline Manger would freeze
-         * all new pipeline creation.
-         */
-        String msg = "Existing pipelines and containers will be closed " +
-            "during upgrade.";
-        msg += "\nNew pipelines creation will remain frozen until upgrade " +
-            "is finalized.";
-        scm.preFinalizeUpgrade();
-        logAndEmit(msg);
-        SCMStorageConfig storage = scm.getScmStorageConfig();
+    storageContainerManager.preFinalizeUpgrade();
+    logAndEmit(msg);
+    return true;
+  }
 
-        for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
-          Optional<? extends UpgradeAction> action = f.scmAction(ON_FINALIZE);
-          finalizeFeature(f, storage, action);
-          updateLayoutVersionInVersionFile(f, storage);
-          versionManager.finalized(f);
-        }
-        versionManager.completeFinalization();
-        scm.postFinalizeUpgrade();
-        emitFinishedMsg();
-        return null;
-      } finally {
-        versionManager.setUpgradeState(FINALIZATION_DONE);
-        isDone = true;
-      }
+  @Override
+  protected void finalizeUpgrade(Storage storageConfig)
+      throws UpgradeException {
+    for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+      Optional<? extends UpgradeAction> action = f.scmAction(ON_FINALIZE);
+      finalizeFeature(f, storageConfig, action);
+      updateLayoutVersionInVersionFile(f, storageConfig);
+      versionManager.finalized(f);
     }
+    versionManager.completeFinalization();
+  }
+
+  public void postFinalizeUpgrade() throws IOException {
+    storageContainerManager.postFinalizeUpgrade();
+    emitFinishedMsg();
   }
 
   @Override
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
index b5543d1..5c79d7a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
@@ -18,19 +18,29 @@
 
 package org.apache.hadoop.hdds.upgrade;
 
+import static java.lang.Thread.sleep;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
 import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.INITIAL_VERSION;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -38,6 +48,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -50,6 +61,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -59,14 +71,22 @@ import 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
+import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
+import 
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -79,10 +99,10 @@ import org.slf4j.LoggerFactory;
 public class TestHDDSUpgrade {
 
   /**
-    * Set a timeout for each test.
-    */
+   * Set a timeout for each test.
+   */
   @Rule
-  public Timeout timeout = new Timeout(300000);
+  public Timeout timeout = new Timeout(11000000);
   private static final Logger LOG =
       LoggerFactory.getLogger(TestHDDSUpgrade.class);
   private static final int NUM_DATA_NODES = 3;
@@ -92,9 +112,9 @@ public class TestHDDSUpgrade {
   private StorageContainerManager scm;
   private ContainerManagerV2 scmContainerManager;
   private PipelineManager scmPipelineManager;
-  private Pipeline ratisPipeline1;
   private final int numContainersCreated = 1;
   private HDDSLayoutVersionManager scmVersionManager;
+  private AtomicBoolean testPassed = new AtomicBoolean(true);
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -102,39 +122,75 @@ public class TestHDDSUpgrade {
    * @throws IOException
    */
   @Before
+  public void setUp() throws Exception {
+    init();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    shutdown();
+  }
+
   public void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1000,
-            TimeUnit.MILLISECONDS);
-
+        TimeUnit.MILLISECONDS);
     conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "1");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(NUM_DATA_NODES)
         // allow only one FACTOR THREE pipeline.
         .setTotalPipelineNumLimit(NUM_DATA_NODES + 1)
-        .setHbInterval(1000)
-        .setHbProcessorInterval(1000)
+        .setHbInterval(500)
+        .setHbProcessorInterval(500)
         .setScmLayoutVersion(INITIAL_VERSION.layoutVersion())
         .setDnLayoutVersion(INITIAL_VERSION.layoutVersion())
         .build();
     cluster.waitForClusterToBeReady();
-    scm = cluster.getStorageContainerManager();
-    scmContainerManager = scm.getContainerManager();
-    scmPipelineManager = scm.getPipelineManager();
-    scmVersionManager = scm.getLayoutVersionManager();
-
+    loadSCMState();
   }
 
   /**
    * Shutdown MiniDFSCluster.
    */
-  @After
   public void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
+  /*
+   * Some tests repeatedly modify the cluster. Helper function to reload the
+   * latest SCM state.
+   */
+  private void loadSCMState(){
+    scm = cluster.getStorageContainerManager();
+    scmContainerManager = scm.getContainerManager();
+    scmPipelineManager = scm.getPipelineManager();
+    scmVersionManager = scm.getLayoutVersionManager();
+  }
+
+
+  /*
+   * helper function to create a Key.
+   */
+  private void createKey() throws IOException {
+    final String uniqueId = "testhddsupgrade";
+    OzoneClient client = OzoneClientFactory.getRpcClient(conf);
+    ObjectStore objectStore = client.getObjectStore();
+    objectStore.createVolume(uniqueId);
+    objectStore.getVolume(uniqueId).createBucket(uniqueId);
+    OzoneOutputStream key =
+        objectStore.getVolume(uniqueId).getBucket(uniqueId)
+            .createKey(uniqueId, 1024, ReplicationType.RATIS,
+                ReplicationFactor.THREE, new HashMap<>());
+    key.write(uniqueId.getBytes(UTF_8));
+    key.flush();
+    key.close();
+  }
+
+  /*
+   * Helper function to test Pre-Upgrade conditions on the SCM
+   */
   private void testPreUpgradeConditionsSCM() {
     Assert.assertEquals(INITIAL_VERSION.layoutVersion(),
         scmVersionManager.getMetadataLayoutVersion());
@@ -143,13 +199,29 @@ public class TestHDDSUpgrade {
     }
   }
 
+  /*
+   * Helper function to test Post-Upgrade conditions on the SCM
+   */
   private void testPostUpgradeConditionsSCM() {
+    loadSCMState();
     Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
         scmVersionManager.getMetadataLayoutVersion());
     Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
 
     // SCM should not return from finalization until there is at least one
     // pipeline to use.
+    try {
+      GenericTestUtils.waitFor(() -> {
+        int pipelineCount = scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
+            .size();
+        if (pipelineCount >= 1) {
+          return true;
+        }
+        return false;
+      }, 500, 60000);
+    } catch (TimeoutException | InterruptedException e) {
+      Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
+    }
     int pipelineCount = scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
         .size();
     Assert.assertTrue(pipelineCount >= 1);
@@ -157,26 +229,32 @@ public class TestHDDSUpgrade {
     // SCM will not return from finalization until there is at least one
     // RATIS 3 pipeline. For this to exist, all three of our datanodes must
     // be in the HEALTHY state.
-    testDataNodesStateOnSCM(HEALTHY);
+    testDataNodesStateOnSCM(HEALTHY, HEALTHY_READONLY);
 
     int countContainers = 0;
     for (ContainerInfo ci : scmContainerManager.getContainers()) {
       HddsProtos.LifeCycleState ciState = ci.getState();
+      LOG.info("testPostUpgradeConditionsSCM: container state is {}",
+          ciState.name());
       Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
           (ciState == HddsProtos.LifeCycleState.CLOSING) ||
+          (ciState == HddsProtos.LifeCycleState.DELETING) ||
+          (ciState == HddsProtos.LifeCycleState.DELETED) ||
           (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
       countContainers++;
     }
-    Assert.assertEquals(numContainersCreated, countContainers);
+    Assert.assertTrue(countContainers >= numContainersCreated);
   }
 
+  /*
+   * Helper function to test Pre-Upgrade conditions on all the DataNodes.
+   */
   private void testPreUpgradeConditionsDataNodes() {
     for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
       DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
       HDDSLayoutVersionManager dnVersionManager =
           dsm.getLayoutVersionManager();
       Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
-
     }
 
     int countContainers = 0;
@@ -194,14 +272,17 @@ public class TestHDDSUpgrade {
     Assert.assertTrue(countContainers >= 1);
   }
 
-
+  /*
+   * Helper function to test Post-Upgrade conditions on all the DataNodes.
+   */
   private void testPostUpgradeConditionsDataNodes() {
     try {
       GenericTestUtils.waitFor(() -> {
         for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
           DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
           try {
-            if (dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) {
+            if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
+                (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
               return false;
             }
           } catch (IOException e) {
@@ -209,7 +290,7 @@ public class TestHDDSUpgrade {
           }
         }
         return true;
-      }, 2000, 20000);
+      }, 500, 60000);
     } catch (TimeoutException | InterruptedException e) {
       Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
     }
@@ -235,12 +316,36 @@ public class TestHDDSUpgrade {
     Assert.assertTrue(countContainers >= 1);
   }
 
-  private void testDataNodesStateOnSCM(NodeState state) {
+  /*
+   * Helper function to test that we can create new pipelines Post-Upgrade.
+   */
+  private void testPostUpgradePipelineCreation() throws IOException {
+    Pipeline ratisPipeline1 = scmPipelineManager.createPipeline(RATIS, THREE);
+    scmPipelineManager.openPipeline(ratisPipeline1.getId());
+    Assert.assertEquals(0,
+        scmPipelineManager.getNumberOfContainers(ratisPipeline1.getId()));
+    PipelineID pid = scmContainerManager.allocateContainer(RATIS, THREE,
+        "Owner1").getPipelineID();
+    Assert.assertEquals(1, scmPipelineManager.getNumberOfContainers(pid));
+    Assert.assertEquals(pid, ratisPipeline1.getId());
+  }
+
+  /*
+   * Helper function to test DataNode state on the SCM. Note that due to
+   * timing constraints, sometime the node-state can transition to the next
+   * state. This function expects the DataNode to be in NodeState "state" or
+   * "alternateState". Some tests can enforce a unique NodeState test by
+   * setting "alternateState = null".
+   */
+  private void testDataNodesStateOnSCM(NodeState state,
+                                       NodeState alternateState) {
     int countNodes = 0;
-    for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()){
+    for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
       try {
-        Assert.assertEquals(state,
-            scm.getScmNodeManager().getNodeStatus(dn).getHealth());
+        NodeState dnState =
+            scm.getScmNodeManager().getNodeStatus(dn).getHealth();
+        Assert.assertTrue((dnState == state) ||
+            (alternateState == null ? false : dnState == alternateState));
       } catch (NodeNotFoundException e) {
         e.printStackTrace();
         Assert.fail("Node not found");
@@ -250,31 +355,43 @@ public class TestHDDSUpgrade {
     Assert.assertEquals(NUM_DATA_NODES, countNodes);
   }
 
+  /*
+   * Helper function to wait for Pipeline creation.
+   */
   private void waitForPipelineCreated() throws Exception {
-    LambdaTestUtils.await(10000, 2000, () -> {
+    LambdaTestUtils.await(10000, 500, () -> {
       List<Pipeline> pipelines =
           scmPipelineManager.getPipelines(RATIS, THREE, OPEN);
       return pipelines.size() == 1;
     });
   }
 
-  @Test
-  public void testFinalizationFromInitialVersionToLatestVersion()
-      throws Exception {
-
-    waitForPipelineCreated();
-
-    // we will create CONTAINERS_CREATED_FOR_TESTING number of containers.
+  /*
+   * Helper function for container creation.
+   */
+  private void createTestContainers() throws IOException {
     XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
     ContainerInfo ci1 = scmContainerManager.allocateContainer(
         RATIS, THREE, "Owner1");
-    ratisPipeline1 = scmPipelineManager.getPipeline(ci1.getPipelineID());
+    Pipeline ratisPipeline1 =
+        scmPipelineManager.getPipeline(ci1.getPipelineID());
     scmPipelineManager.openPipeline(ratisPipeline1.getId());
     XceiverClientSpi client1 =
         xceiverClientManager.acquireClient(ratisPipeline1);
     ContainerProtocolCalls.createContainer(client1,
         ci1.getContainerID(), null);
     xceiverClientManager.releaseClient(client1, false);
+  }
+
+  /*
+   * Happy Path Test Case.
+   */
+  @Test
+  public void testFinalizationFromInitialVersionToLatestVersion()
+      throws Exception {
+
+    waitForPipelineCreated();
+    createTestContainers();
 
     // Test the Pre-Upgrade conditions on SCM as well as DataNodes.
     testPreUpgradeConditionsSCM();
@@ -312,6 +429,9 @@ public class TestHDDSUpgrade {
     // Verify Post-Upgrade conditions on the SCM.
     testPostUpgradeConditionsSCM();
 
+    // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
+    testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+
     // Verify the SCM has driven all the DataNodes through Layout Upgrade.
     testPostUpgradeConditionsDataNodes();
 
@@ -322,5 +442,624 @@ public class TestHDDSUpgrade {
     store.getVolume("vol1").createBucket("buc1");
     store.getVolume("vol1").getBucket("buc1").createKey("key1", 100,
         ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
+
+  }
+
+  /*
+   * All the subsequent tests here are failure cases. Some of the tests below
+   * could simultaneously fail one or more nodes at specific execution points
+   * and in different thread contexts.
+   * Upgrade path key execution points are defined in
+   * UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+
+  /*
+   * Helper function to inject SCM failure and a SCM restart at a given
+   * execution point during SCM-Upgrade.
+   *
+   * Injects Failure in  : SCM
+   * Executing-Thread-Context : SCM-Upgrade
+   */
+  private Boolean injectSCMFailureDuringSCMUpgrade()
+      throws InterruptedException, TimeoutException, AuthenticationException,
+      IOException {
+    // For some tests this could get called in a different thread context.
+    // We need to guard concurrent updates to the cluster.
+    synchronized(cluster) {
+      cluster.restartStorageContainerManager(true);
+      loadSCMState();
+    }
+    // The ongoing current SCM Upgrade is getting aborted at this point. We
+    // need to schedule a new SCM Upgrade on a different thread context.
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loadSCMState();
+          scm.finalizeUpgrade("xyz");
+        } catch (IOException e) {
+          e.printStackTrace();
+          testPassed.set(false);
+        }
+      }
+    });
+    t.start();
+    return true;
+  }
+
+  /*
+   * Helper function to inject DataNode failures and DataNode restarts at a
+   * given execution point during SCM-Upgrade. Please note that it fails all
+   * the DataNodes in the cluster and is part of test cases that simulate
+   * multi-node failure at specific code-execution points during SCM Upgrade.
+   * Please note that this helper function should be called in the thread
+   * context of an SCM-Upgrade only. The return value has a significance that
+   * it does not abort the currently ongoing SCM upgrade. because this
+   * failure injection does not fail the SCM node and only impacts datanodes,
+   *  we do not need to schedule another scm-finalize-upgrade here.
+   *
+   * Injects Failure in  : All the DataNodes
+   * Executing-Thread-Context : SCM-Upgrade
+   */
+  private Boolean injectDataNodeFailureDuringSCMUpgrade() {
+    try {
+      // Work on a Copy of current set of DataNodes to avoid
+      // running into tricky situations.
+      List<HddsDatanodeService> currentDataNodes =
+          new ArrayList<>(cluster.getHddsDatanodes());
+      for (HddsDatanodeService ds: currentDataNodes) {
+        DatanodeDetails dn = ds.getDatanodeDetails();
+        cluster.restartHddsDatanode(dn, false);
+      }
+      cluster.waitForClusterToBeReady();
+    } catch (Exception e) {
+      LOG.info("DataNode Restarts Failed!");
+      testPassed.set(false);
+    }
+    loadSCMState();
+    // returning false from injection function, continues currently ongoing
+    // SCM-Upgrade-Finalization.
+    return false;
+  }
+
+  /*
+   * Helper function to inject a DataNode failure and restart for a specific
+   * DataNode. This injection function can target a specific DataNode and
+   * thus facilitates getting called in the upgrade-finalization thread context
+   * of that specific DataNode.
+   *
+   * Injects Failure in  : Given DataNodes
+   * Executing-Thread-Context : the same DataNode that we are failing here.
+   */
+  private Thread injectDataNodeFailureDuringDataNodeUpgrade(
+      DatanodeDetails dn) {
+    Thread t = null;
+    try {
+      // Schedule the DataNode restart on a separate thread context
+      // otherwise DataNode restart will hang. Also any cluster modification
+      // needs to be guarded since it could get modified in multiple 
independent
+      // threads.
+      t = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            synchronized (cluster) {
+              cluster.restartHddsDatanode(dn, true);
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+            testPassed.set(false);
+          }
+        }
+      });
+    } catch (Exception e) {
+      LOG.info("DataNode Restart Failed!");
+      Assert.fail(e.getMessage());
+    }
+    return t;
+  }
+
+  /*
+   * Helper function to inject coordinated failures and restarts across
+   * all the DataNode as well as SCM. This can help create targeted test cases
+   * to inject such comprehensive failures in SCM-Upgrade-Context as well as
+   * DataNode-Upgrade-Context.
+   *
+   * Injects Failure in  : SCM as well as ALL the DataNodes.
+   * Executing-Thread-Context : Either the SCM-Upgrade-Finalizer or the
+   *                            DataNode-Upgrade-Finalizer.
+   */
+  private Thread injectSCMAndDataNodeFailureTogetherAtTheSameTime()
+      throws InterruptedException, TimeoutException, AuthenticationException,
+      IOException {
+    // This needs to happen in a separate thread context otherwise
+    // DataNode restart will hang.
+    return new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Since we are modifying cluster in an independent thread context,
+          // we synchronize access to it to avoid concurrent modification
+          // exception.
+          synchronized (cluster) {
+            // Work on a Copy of current set of DataNodes to avoid
+            // running into tricky situations.
+            List<HddsDatanodeService> currentDataNodes =
+                new ArrayList<>(cluster.getHddsDatanodes());
+            for (HddsDatanodeService ds: currentDataNodes) {
+              DatanodeDetails dn = ds.getDatanodeDetails();
+              cluster.restartHddsDatanode(dn, false);
+            }
+            cluster.restartStorageContainerManager(false);
+            cluster.waitForClusterToBeReady();
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          testPassed.set(false);
+        }
+      }
+    });
+  }
+
+  /*
+   * We have various test cases to target single-node or multi-node failures
+   * below.
+   **/
+
+  /*
+   * One node(SCM) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test SCM failure During SCM Upgrade before execution point
+   * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testScmFailuresBeforeScmPreFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        BEFORE_PRE_FINALIZE_UPGRADE,
+        () -> {
+          return this.injectSCMFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * One node(SCM) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test SCM failure During SCM Upgrade after execution point
+   * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testScmFailuresAfterScmPreFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_PRE_FINALIZE_UPGRADE,
+        () -> {
+          return this.injectSCMFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * One node(SCM) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test SCM failure During SCM Upgrade after execution point
+   * "CompleteFinalization". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testScmFailuresAfterScmCompleteFinalization()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_COMPLETE_FINALIZATION,
+        () -> {
+          return this.injectSCMFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * One node(SCM) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test SCM failure During SCM Upgrade after execution point
+   * "PostFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testScmFailuresAfterScmPostFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_POST_FINALIZE_UPGRADE,
+        () -> {
+          return this.injectSCMFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * Multi node(all DataNodes) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test all DataNode failures During SCM Upgrade before execution point
+   * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testAllDataNodeFailuresBeforeScmPreFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        BEFORE_PRE_FINALIZE_UPGRADE,
+        () -> {
+          return injectDataNodeFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * Multi node(all DataNodes) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test all DataNode failures During SCM Upgrade before execution point
+   * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testAllDataNodeFailuresAfterScmPreFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_PRE_FINALIZE_UPGRADE,
+        () -> {
+          return injectDataNodeFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * Multi node(all DataNodes) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test all DataNode failures During SCM Upgrade after execution point
+   * "CompleteFinalization". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testAllDataNodeFailuresAfterScmCompleteFinalization()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_COMPLETE_FINALIZATION,
+        () -> {
+          return injectDataNodeFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * Multi node(all DataNodes) failure case:
+   * Thread-Context : SCM-Upgrade
+   *
+   * Test all DataNode failures During SCM Upgrade after execution point
+   * "PostFinalizeUpgrade". All meaningful Upgrade execution points
+   * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Test
+  public void testAllDataNodeFailuresAfterScmPostFinalizeUpgrade()
+      throws Exception {
+    testPassed.set(true);
+    InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+        new InjectedUpgradeFinalizationExecutor();
+    ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+        .setFinalizationExecutor(scmFinalizationExecutor);
+    scmFinalizationExecutor.configureTestInjectionFunction(
+        AFTER_POST_FINALIZE_UPGRADE,
+        () -> {
+          return injectDataNodeFailureDuringSCMUpgrade();
+        });
+    testFinalizationWithFailuerInjectionHelper(null);
+    Assert.assertTrue(testPassed.get());
+  }
+
+  /*
+   * Single node(targeted DataNode) failure case:
+   * Thread-Context : DataNode-Upgrade.
+   *
+   * Fail the same DataNode that is going through Upgrade-processing at a
+   * specific code execution point. This test covers all the meaningful
+   * Upgrade execution points as defined in
+   * UpgradeFinalizer:UpgradeTestInjectionPoints.
+   */
+  @Ignore
+  @Test
+  public void testDataNodeFailuresDuringDataNodeUpgrade()
+      throws Exception {
+    for (UpgradeTestInjectionPoints injectionPoint:
+        UpgradeTestInjectionPoints.values()) {
+      testPassed.set(true);
+      // Configure a given data node to fail itself when it's
+      // corresponding Upgrade-Finalizer reaches a specific point in it's
+      // execution.
+      HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+      Thread failureInjectionThread =
+          injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails());
+      InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+          new InjectedUpgradeFinalizationExecutor();
+      dataNodeFinalizationExecutor.configureTestInjectionFunction(
+          injectionPoint, () -> {
+            failureInjectionThread.start();
+            return true;
+          });
+      ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+          .getUpgradeFinalizer())
+          .setFinalizationExecutor(dataNodeFinalizationExecutor);
+      testFinalizationWithFailuerInjectionHelper(failureInjectionThread);
+      Assert.assertTrue(testPassed.get());
+      synchronized (cluster) {
+        shutdown();
+        init();
+      }
+      LOG.info("testDataNodeFailuresDuringDataNodeUpgrade: Failure Injection " 
+
+          "Point {} passed.", injectionPoint.name());
+    }
+  }
+
+  /*
+   * Two nodes(SCM and a targeted DataNode) combination failure case:
+   * Thread-Contexts :
+   *          DataNode failure in its own DataNode-Upgrade-Context .
+   *          SCM failure in its own SCM-Upgrade-Context .
+   *
+   * Fail the same DataNode that is going through its own Upgrade-processing
+   * at a specific code execution point. Also fail the SCM when SCM is going
+   * through upgrade-finalization. This test covers all the combinations of
+   * SCM-Upgrade-execution points and DataNode-Upgrade-execution points.
+   */
+  @Ignore
+  @Test
+  public void testAllPossibleDataNodeFailuresAndSCMFailures()
+      throws Exception {
+    // Configure a given data node to restart itself when it's
+    // corresponding Upgrade-Finalizer reaches a specific point in it's
+    // execution.
+    for (UpgradeTestInjectionPoints scmInjectionPoint :
+        UpgradeTestInjectionPoints.values()) {
+      InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+          new InjectedUpgradeFinalizationExecutor();
+      scmFinalizationExecutor.configureTestInjectionFunction(
+          scmInjectionPoint,
+          () -> {
+            return this.injectSCMFailureDuringSCMUpgrade();
+          });
+      ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+          .setFinalizationExecutor(scmFinalizationExecutor);
+
+      for (UpgradeTestInjectionPoints datanodeInjectionPoint :
+          UpgradeTestInjectionPoints.values()) {
+        HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+        testPassed.set(true);
+        Thread dataNodefailureInjectionThread =
+            
injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails());
+        InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+            new InjectedUpgradeFinalizationExecutor();
+        dataNodeFinalizationExecutor.configureTestInjectionFunction(
+                datanodeInjectionPoint, () -> {
+            dataNodefailureInjectionThread.start();
+            return true;
+          });
+        ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+            .getUpgradeFinalizer())
+            .setFinalizationExecutor(dataNodeFinalizationExecutor);
+        testFinalizationWithFailuerInjectionHelper(
+            dataNodefailureInjectionThread);
+        Assert.assertTrue(testPassed.get());
+        synchronized (cluster) {
+          shutdown();
+          init();
+        }
+        LOG.info("testAllPossibleDataNodeFailuresAndSCMFailures: " +
+            "DataNode-Failure-Injection-Point={} with " +
+            "Scm-FailureInjection-Point={} passed.",
+            datanodeInjectionPoint.name(), scmInjectionPoint.name());
+      }
+    }
+  }
+
+  /*
+   * Two nodes(SCM and a targeted DataNode together at the same time)
+   * combination failure case:
+   * Thread-Contexts :
+   *          SCM-Upgrade-Finalizer-Context
+   *
+   * Fail the DataNode and the SCM together when the SCM is going
+   * through upgrade. This test covers all the combinations of
+   * SCM-Upgrade-execution points.
+   */
+  @Ignore
+  @Test
+  public void testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade()
+      throws Exception {
+    for (UpgradeTestInjectionPoints injectionPoint :
+        UpgradeTestInjectionPoints.values()) {
+      testPassed.set(true);
+      Thread helpingFailureInjectionThread =
+          injectSCMAndDataNodeFailureTogetherAtTheSameTime();
+      InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+          new InjectedUpgradeFinalizationExecutor();
+      scmFinalizationExecutor.configureTestInjectionFunction(
+          injectionPoint, () -> {
+            helpingFailureInjectionThread.start();
+            return true;
+          });
+      ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+          .setFinalizationExecutor(scmFinalizationExecutor);
+      
testFinalizationWithFailuerInjectionHelper(helpingFailureInjectionThread);
+      Assert.assertTrue(testPassed.get());
+      synchronized (cluster) {
+        shutdown();
+        init();
+      }
+      LOG.info("testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade: Failure " +
+          "Injection Point {} passed.", injectionPoint.name());
+    }
+  }
+
+  /*
+   * Two nodes(SCM and a targeted DataNode together at the same time)
+   * combination failure case:
+   * Thread-Contexts :
+   *          DataNode-Upgrade-Finalizer-Context.
+   *
+   * Fail the DataNode and the SCM together when the DataNode is going
+   * through upgrade. This test covers all the combinations of
+   * DataNode-Upgrade-execution points.
+   */
+  @Ignore
+  @Test
+  public void testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade()
+      throws Exception {
+    for (UpgradeTestInjectionPoints injectionPoint :
+        UpgradeTestInjectionPoints.values()) {
+      testPassed.set(true);
+      Thread helpingFailureInjectionThread =
+          injectSCMAndDataNodeFailureTogetherAtTheSameTime();
+      HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+      InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+          new InjectedUpgradeFinalizationExecutor();
+      dataNodeFinalizationExecutor.configureTestInjectionFunction(
+              injectionPoint, () -> {
+          helpingFailureInjectionThread.start();
+          return true;
+        });
+      ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+          .getUpgradeFinalizer())
+          .setFinalizationExecutor(dataNodeFinalizationExecutor);
+      
testFinalizationWithFailuerInjectionHelper(helpingFailureInjectionThread);
+      Assert.assertTrue(testPassed.get());
+      synchronized (cluster) {
+        shutdown();
+        init();
+      }
+      LOG.info("testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade: " +
+          "Failure Injection Point {} passed.", injectionPoint.name());
+    }
+  }
+
+  public void testFinalizationWithFailuerInjectionHelper(
+      Thread failureInjectionThread) throws Exception {
+
+    waitForPipelineCreated();
+    createTestContainers();
+    createKey();
+
+    // Test the Pre-Upgrade conditions on SCM as well as DataNodes.
+    testPreUpgradeConditionsSCM();
+    testPreUpgradeConditionsDataNodes();
+
+    // Trigger Finalization on the SCM
+    StatusAndMessages status = scm.finalizeUpgrade("xyz");
+    Assert.assertEquals(STARTING_FINALIZATION, status.status());
+
+    // Make sure that any outstanding thread created by failure injection
+    // has completed its job.
+    if (failureInjectionThread != null) {
+      failureInjectionThread.join();
+    }
+
+    // Wait for the Finalization to complete on the SCM.
+    // Failure injection could have restarted the SCM and it could be in
+    // ALREADY_FINALIZED state as well.
+    while ((status.status() != FINALIZATION_DONE) &&
+        (status.status() != ALREADY_FINALIZED)) {
+      loadSCMState();
+      status = scm.queryUpgradeFinalizationProgress("xyz", true);
+      if (status.status() == FINALIZATION_REQUIRED) {
+        status = scm.finalizeUpgrade("xyz");
+      }
+    }
+
+    // Verify Post-Upgrade conditions on the SCM.
+    // With failure injection
+    testPostUpgradeConditionsSCM();
+
+    // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
+    // Due to timing constraint also allow a "HEALTHY" state.
+    loadSCMState();
+    testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+
+    // Need to wait for post finalization heartbeat from DNs.
+    LambdaTestUtils.await(600000, 500, () -> {
+      try {
+        loadSCMState();
+        testDataNodesStateOnSCM(HEALTHY, null);
+        sleep(100);
+      } catch (Throwable ex) {
+        LOG.info(ex.getMessage());
+        return false;
+      }
+      return true;
+    });
+
+    // Verify the SCM has driven all the DataNodes through Layout Upgrade.
+    testPostUpgradeConditionsDataNodes();
+
+    // Verify that new pipeline can be created with upgraded datanodes.
+    try {
+      testPostUpgradePipelineCreation();
+    } catch(SCMException e) {
+      // If pipeline creation fails, make sure that there is a valid reason
+      // for this i.e. all datanodes are already part of some pipeline.
+      for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
+        DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+        Set<PipelineID> pipelines =
+            scm.getScmNodeManager().getPipelines(dsm.getDatanodeDetails());
+        Assert.assertTrue(pipelines != null);
+      }
+    }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
index 598b046..2b2ef7e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -20,21 +20,18 @@ package org.apache.hadoop.ozone.om.upgrade;
 
 import static 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
 import static org.apache.hadoop.ozone.OzoneConsts.LAYOUT_VERSION_KEY;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 
 import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
 
 /**
  * UpgradeFinalizer implementation for the Ozone Manager service.
@@ -42,6 +39,7 @@ import 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
 public class OMUpgradeFinalizer extends BasicUpgradeFinalizer<OzoneManager,
     OMLayoutVersionManager> {
   private  static final OmUpgradeAction NOOP = a -> {};
+  private OzoneManager ozoneManager;
 
   public OMUpgradeFinalizer(OMLayoutVersionManager versionManager) {
     super(versionManager);
@@ -50,6 +48,7 @@ public class OMUpgradeFinalizer extends 
BasicUpgradeFinalizer<OzoneManager,
   @Override
   public StatusAndMessages finalize(String upgradeClientID, OzoneManager om)
       throws IOException {
+    ozoneManager = om;
     StatusAndMessages response = preFinalize(upgradeClientID, om);
     if (response.status() != FINALIZATION_REQUIRED) {
       return response;
@@ -64,64 +63,36 @@ public class OMUpgradeFinalizer extends 
BasicUpgradeFinalizer<OzoneManager,
     //    ExecutorService executor =
     //        Executors.newSingleThreadExecutor(r -> new Thread(threadName));
     //    executor.submit(new Worker(om));
-    new Worker(om).call();
+    try {
+      getFinalizationExecutor().execute(ozoneManager.getOmStorage(),
+          this);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw (IOException) e;
+    }
     return STARTING_MSG;
   }
 
-  /**
-   * This class implements the finalization logic applied to every
-   * LayoutFeature that needs to be finalized.
-   *
-   * For the first approach this happens synchronously within the state machine
-   * during the FinalizeUpgrade request, but ideally this has to be moved to
-   * individual calls that are going into the StateMaching one by one.
-   * The prerequisits for this to happen in the background are the following:
-   * - more fine grained control for LayoutFeatures to prepare the
-   *    finalization outside the state machine, do the switch from old to new
-   *    logic inside the statemachine and apply the finalization, and then do
-   *    any cleanup necessary outside the state machine
-   * - a way to post a request to the state machine that is not part of the
-   *    client API, so basically not an OMRequest, but preferably an internal
-   *    request, which is posted from the leader OM to the follower OMs only.
-   * - ensure that there is a possibility to implement a rollback logic if
-   *    something goes wrong inside the state machine, to avoid OM stuck in an
-   *    intermediate state due to an error.
-   */
-  private class Worker implements Callable<Void> {
-    private OzoneManager ozoneManager;
+  @Override
+  protected void postFinalizeUpgrade() throws IOException {
+    return;
+  }
 
-    /**
-     * Initiates the Worker, for the specified OM instance.
-     * @param om the OzoneManager instance on which to finalize the new
-     *           LayoutFeatures.
-     */
-    Worker(OzoneManager om) {
-      ozoneManager = om;
+  @Override
+  protected void finalizeUpgrade(Storage storageConfig)
+      throws UpgradeException {
+    for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
+      Optional<? extends UpgradeAction> action = f.action(ON_FINALIZE);
+      finalizeFeature(f, storageConfig, action);
+      updateLayoutVersionInVersionFile(f, storageConfig);
+      versionManager.finalized(f);
     }
+    versionManager.completeFinalization();
+  }
 
-    @Override
-    public Void call() throws IOException {
-      try {
-        emitStartingMsg();
-        versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
-
-        OMStorage omStorage = ozoneManager.getOmStorage();
-
-        for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
-          Optional<? extends UpgradeAction> action = f.action(ON_FINALIZE);
-          finalizeFeature(f, omStorage, action);
-          updateLayoutVersionInVersionFile(f, ozoneManager.getOmStorage());
-          versionManager.finalized(f);
-        }
-
-        versionManager.completeFinalization();
-        emitFinishedMsg();
-        return null;
-      } finally {
-        versionManager.setUpgradeState(FINALIZATION_DONE);
-        isDone = true;
-      }
-    }
+  @Override
+  protected boolean preFinalizeUpgrade() throws IOException {
+    return true;
   }
 
   public void runPrefinalizeStateActions(Storage storage, OzoneManager om)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to