This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new c3e3b35  HDDS-4179. Implement post-finalize SCM logic. (#1611)
c3e3b35 is described below

commit c3e3b3541025c3107ae77287466e4082ad2cd11f
Author: prashantpogde <[email protected]>
AuthorDate: Thu Dec 3 09:48:58 2020 -0800

    HDDS-4179. Implement post-finalize SCM logic. (#1611)
---
 .../hdds/upgrade/HDDSLayoutFeatureCatalog.java     |   3 +-
 .../hadoop/hdds/upgrade/HDDSUpgradeAction.java     |   6 +-
 .../upgrade/AbstractLayoutVersionManager.java      |   9 +-
 .../ozone/upgrade/BasicUpgradeFinalizer.java       | 305 +++++++++++++++++++++
 .../hadoop/ozone/upgrade/UpgradeException.java     | 116 ++++++++
 .../hadoop/ozone/upgrade/UpgradeFinalizer.java     |  10 +
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   2 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  69 +++--
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   6 +
 .../scm/pipeline/BackgroundPipelineCreator.java    |  14 +
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   4 +
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  25 ++
 .../hdds/scm/server/StorageContainerManager.java   | 108 ++++++++
 .../server/upgrade/NewSCMFeatureUpgradeAction.java |  34 ---
 .../hdds/scm/server/upgrade/SCMUpgradeAction.java  |   4 +-
 .../scm/server/upgrade/SCMUpgradeFinalizer.java    |  96 +++++++
 .../ozone/om/upgrade/OMUpgradeFinalizer.java       | 253 +++--------------
 .../ozone/om/upgrade/TestOMUpgradeFinalizer.java   |  27 +-
 18 files changed, 802 insertions(+), 289 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
index 69832bd..5ef86af 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
@@ -39,7 +39,8 @@ public class HDDSLayoutFeatureCatalog {
 
     private int layoutVersion;
     private String description;
-    private Optional<HDDSUpgradeAction> hddsUpgradeAction = Optional.empty();
+    private Optional< ? extends HDDSUpgradeAction> hddsUpgradeAction =
+        Optional.empty();
 
     HDDSLayoutFeature(final int layoutVersion, String description) {
       this.layoutVersion = layoutVersion;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
index 0808b0f..68ab666 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
@@ -23,5 +23,9 @@ import 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
 /**
  * Upgrade Action for SCM and DataNodes.
  */
-public interface HDDSUpgradeAction<T> extends UpgradeAction<T> {
+public class HDDSUpgradeAction<T> implements UpgradeAction<T> {
+  @Override
+  public void executeAction(T arg) throws Exception {
+
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 831e6a1..c3ecb54 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status;
 
 /**
  * Layout Version Manager containing generic method implementations.
@@ -44,7 +45,7 @@ public abstract class AbstractLayoutVersionManager<T extends 
LayoutFeature>
   protected TreeMap<Integer, T> features = new TreeMap<>();
   protected Map<String, T> featureMap = new HashMap<>();
   protected volatile boolean isInitialized = false;
-  protected volatile UpgradeFinalizer.Status currentUpgradeState =
+  protected volatile Status currentUpgradeState =
       FINALIZATION_REQUIRED;
 
   protected void init(int version, T[] lfs) throws IOException {
@@ -68,10 +69,14 @@ public abstract class AbstractLayoutVersionManager<T 
extends LayoutFeature>
         "AbstractLayoutVersionManager", this);
   }
 
-  public UpgradeFinalizer.Status getUpgradeState() {
+  public Status getUpgradeState() {
     return currentUpgradeState;
   }
 
+  public void setUpgradeState(Status status) {
+    currentUpgradeState = status;
+  }
+
   private void initializeFeatures(T[] lfs) {
     Arrays.stream(lfs).forEach(f -> {
       Preconditions.checkArgument(!featureMap.containsKey(f.name()));
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
new file mode 100644
index 0000000..3099cef
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.INVALID_REQUEST;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes;
+
+/**
+ * UpgradeFinalizer implementation for the Storage Container Manager service.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class BasicUpgradeFinalizer<T, V extends AbstractLayoutVersionManager>
+    implements UpgradeFinalizer<T> {
+
+  protected V versionManager;
+  protected String clientID;
+  protected T component;
+
+  private Queue<String> msgs = new ConcurrentLinkedQueue<>();
+  protected boolean isDone = false;
+
+  public BasicUpgradeFinalizer(V versionManager) {
+    this.versionManager = versionManager;
+  }
+
+  public boolean isFinalizationDone() {
+    return isDone;
+  }
+
+  public synchronized StatusAndMessages preFinalize(String upgradeClientID,
+                                                    T id)
+      throws UpgradeException {
+    switch (versionManager.getUpgradeState()) {
+    case STARTING_FINALIZATION:
+      return STARTING_MSG;
+    case FINALIZATION_IN_PROGRESS:
+      return FINALIZATION_IN_PROGRESS_MSG;
+    case FINALIZATION_DONE:
+    case ALREADY_FINALIZED:
+      return FINALIZED_MSG;
+    default:
+      if (!versionManager.needsFinalization()) {
+        throw new UpgradeException("Upgrade found in inconsistent state. " +
+            "Upgrade state is FINALIZATION_REQUIRED while MLV has been " +
+            "upgraded to SLV.", INVALID_REQUEST);
+      }
+      versionManager.setUpgradeState(STARTING_FINALIZATION);
+
+      clientID = upgradeClientID;
+      this.component = id;
+      return FINALIZATION_REQUIRED_MSG;
+    }
+  }
+
+  /*
+   * This method must be overriden by the component implementing the
+   * finalization logic.
+   */
+  public StatusAndMessages finalize(String upgradeClientID, T id)
+      throws IOException {
+    StatusAndMessages response = preFinalize(upgradeClientID, id);
+    if (response.status() != FINALIZATION_REQUIRED) {
+      return response;
+    }
+
+    /**
+     * Overriding class should schedule actual finalization logic
+     * in a separate thread here.
+     */
+    return STARTING_MSG;
+  }
+
+  @Override
+  public synchronized StatusAndMessages reportStatus(
+      String upgradeClientID, boolean takeover
+  ) throws UpgradeException {
+    if (takeover) {
+      clientID = upgradeClientID;
+    }
+    assertClientId(upgradeClientID);
+    List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
+    Status status = versionManager.getUpgradeState();
+    while (msgs.size() > 0) {
+      returningMsgs.add(msgs.poll());
+    }
+    return new StatusAndMessages(status, returningMsgs);
+  }
+
+  private void assertClientId(String id) throws UpgradeException {
+    if (!this.clientID.equals(id)) {
+      throw new UpgradeException("Unknown client tries to get finalization " +
+          "status.\n The requestor is not the initiating client of the " +
+          "finalization, if you want to take over, and get unsent status " +
+          "messages, check -takeover option.", INVALID_REQUEST);
+    }
+  }
+
+  protected void finalizeFeature(LayoutFeature feature, Storage config)
+      throws UpgradeException {
+    Optional<? extends UpgradeAction> action = feature.onFinalizeAction();
+
+    if (!action.isPresent()) {
+      emitNOOPMsg(feature.name());
+      return;
+    }
+
+    putFinalizationMarkIntoVersionFile(feature, config);
+
+    emitStartingFinalizationActionMsg(feature.name());
+    try {
+      UpgradeAction<T> newaction = action.get();
+      newaction.executeAction(component);
+    } catch (Exception e) {
+      logFinalizationFailureAndThrow(e, feature.name());
+    }
+    emitFinishFinalizationActionMsg(feature.name());
+
+    removeFinalizationMarkFromVersionFile(feature, config);
+  }
+
+  protected void updateLayoutVersionInVersionFile(LayoutFeature feature,
+                                                  Storage config)
+      throws UpgradeException {
+    int prevLayoutVersion = currentStoredLayoutVersion(config);
+
+    updateStorageLayoutVersion(feature.layoutVersion(), config);
+    try {
+      persistStorage(config);
+    } catch (IOException e) {
+      updateStorageLayoutVersion(prevLayoutVersion, config);
+      logLayoutVersionUpdateFailureAndThrow(e);
+    }
+  }
+
+  protected void putFinalizationMarkIntoVersionFile(LayoutFeature feature,
+                                                  Storage config)
+      throws UpgradeException {
+    try {
+      emitUpgradeToLayoutVersionPersistingMsg(feature.name());
+
+      setUpgradeToLayoutVersionInStorage(feature.layoutVersion(), config);
+      persistStorage(config);
+
+      emitUpgradeToLayoutVersionPersistedMsg();
+    } catch (IOException e) {
+      logUpgradeToLayoutVersionPersistingFailureAndThrow(feature.name(), e);
+    }
+  }
+
+  protected void removeFinalizationMarkFromVersionFile(
+      LayoutFeature feature, Storage config) throws UpgradeException {
+    try {
+      emitRemovingUpgradeToLayoutVersionMsg(feature.name());
+
+      unsetUpgradeToLayoutVersionInStorage(config);
+      persistStorage(config);
+
+      emitRemovedUpgradeToLayoutVersionMsg();
+    } catch (IOException e) {
+      logUpgradeToLayoutVersionRemovalFailureAndThrow(feature.name(), e);
+    }
+  }
+
+  private void setUpgradeToLayoutVersionInStorage(int version,
+                                                  Storage config) {
+    config.setUpgradeToLayoutVersion(version);
+  }
+
+  private void unsetUpgradeToLayoutVersionInStorage(Storage config) {
+    config.unsetUpgradeToLayoutVersion();
+  }
+
+  private int currentStoredLayoutVersion(Storage config) {
+    return config.getLayoutVersion();
+  }
+
+  private void updateStorageLayoutVersion(int version, Storage config) {
+    config.setLayoutVersion(version);
+  }
+
+  private void persistStorage(Storage config) throws IOException {
+    config.persistCurrentState();
+  }
+
+  protected void emitNOOPMsg(String feature) {
+    String msg = "No finalization work defined for feature: " + feature + ".";
+    String msg2 = "Skipping.";
+
+    logAndEmit(msg);
+    logAndEmit(msg2);
+  }
+
+  protected void emitStartingMsg() {
+    String msg = "Finalization started.";
+    logAndEmit(msg);
+  }
+
+  protected void emitFinishedMsg() {
+    String msg = "Finalization is done.";
+    logAndEmit(msg);
+  }
+
+  protected void emitStartingFinalizationActionMsg(String feature) {
+    String msg = "Executing finalization of feature: " + feature + ".";
+    logAndEmit(msg);
+  }
+
+  protected void emitFinishFinalizationActionMsg(String feature) {
+    String msg = "The feature " + feature + " is finalized.";
+    logAndEmit(msg);
+  }
+
+  private void emitUpgradeToLayoutVersionPersistingMsg(String feature) {
+    String msg = "Mark finalization of " + feature + " in VERSION file.";
+    logAndEmit(msg);
+  }
+
+  private void emitUpgradeToLayoutVersionPersistedMsg() {
+    String msg = "Finalization mark placed.";
+    logAndEmit(msg);
+  }
+
+  private void emitRemovingUpgradeToLayoutVersionMsg(String feature) {
+    String msg = "Remove finalization mark of " + feature
+        + " feature from VERSION file.";
+    logAndEmit(msg);
+  }
+
+  private void emitRemovedUpgradeToLayoutVersionMsg() {
+    String msg = "Finalization mark removed.";
+    logAndEmit(msg);
+  }
+
+  private void logAndEmit(String msg) {
+    LOG.info(msg);
+    msgs.offer(msg);
+  }
+
+  protected void logFinalizationFailureAndThrow(Exception e, String feature)
+      throws UpgradeException {
+    String msg = "Error during finalization of " + feature + ".";
+    logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
+  }
+
+  private void logLayoutVersionUpdateFailureAndThrow(IOException e)
+      throws UpgradeException {
+    String msg = "Updating the LayoutVersion in the VERSION file failed.";
+    logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
+  }
+
+  private void logUpgradeToLayoutVersionPersistingFailureAndThrow(
+      String feature, IOException e
+  ) throws UpgradeException {
+    String msg = "Failed to update VERSION file with the upgrading feature: "
+        + feature + ".";
+    logAndThrow(e, msg, PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+  }
+
+  private void logUpgradeToLayoutVersionRemovalFailureAndThrow(
+      String feature, IOException e) throws UpgradeException {
+    String msg =
+        "Failed to unmark finalization of " + feature + " LayoutFeature.";
+    logAndThrow(e, msg, REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+  }
+
+  private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
+      throws UpgradeException {
+    LOG.error(msg, e);
+    throw new UpgradeException(msg, e, resultCode);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
new file mode 100644
index 0000000..7b3a5c0
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.upgrade;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when upgrade fails.
+ */
+public class UpgradeException extends IOException {
+
+  public static final String STATUS_CODE = "STATUS_CODE=";
+  private final UpgradeException.ResultCodes result;
+
+  /**
+   * Constructs an {@code IOException} with {@code null}
+   * as its error detail message.
+   */
+  public UpgradeException(UpgradeException.ResultCodes result) {
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   */
+  public UpgradeException(String message, UpgradeException.ResultCodes result) 
{
+    super(message);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message
+   * and cause.
+   * <p>
+   * <p> Note that the detail message associated with {@code cause} is
+   * <i>not</i> automatically incorporated into this exception's detail
+   * message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public UpgradeException(String message, Throwable cause,
+                          UpgradeException.ResultCodes result) {
+    super(message, cause);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified cause and a
+   * detail message of {@code (cause==null ? null : cause.toString())}
+   * (which typically contains the class and detail message of {@code cause}).
+   * This constructor is useful for IO exceptions that are little more
+   * than wrappers for other throwables.
+   *
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public UpgradeException(Throwable cause,
+                          UpgradeException.ResultCodes result) {
+    super(cause);
+    this.result = result;
+  }
+
+  /**
+   * Returns resultCode.
+   * @return ResultCode
+   */
+  public UpgradeException.ResultCodes getResult() {
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return result + " " + super.toString();
+  }
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ResultCodes {
+
+    OK,
+
+    INVALID_REQUEST,
+
+    PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+    REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+    UPDATE_LAYOUT_VERSION_FAILED,
+    LAYOUT_FEATURE_FINALIZATION_FAILED;
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
index 5568d59..64e83c7 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
@@ -112,6 +112,16 @@ public interface UpgradeFinalizer<T> {
       Arrays.asList("Starting Finalization")
   );
 
+  StatusAndMessages FINALIZATION_IN_PROGRESS_MSG = new StatusAndMessages(
+      Status.FINALIZATION_IN_PROGRESS,
+      Arrays.asList("Finalization in progress")
+  );
+
+  StatusAndMessages FINALIZATION_REQUIRED_MSG = new StatusAndMessages(
+      Status.FINALIZATION_REQUIRED,
+      Arrays.asList("Finalization required")
+  );
+
   /**
    * Default message to provide when the service is in ALREADY_FINALIZED state.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 3952565..48c9e04 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -235,4 +235,6 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
   default HDDSLayoutVersionManager getLayoutVersionManager(){
     return null;
   }
+
+  default void forceNodesToHealthyReadOnly() { }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 9cc403f..a01cca73 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -56,6 +56,13 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -166,7 +173,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
     this.state2EventMap = new HashMap<>();
     initialiseState2EventMap();
     Set<NodeState> finalStates = new HashSet<>();
-    finalStates.add(NodeState.DECOMMISSIONED);
+    finalStates.add(DECOMMISSIONED);
     // All DataNodes should start in HealthyReadOnly state.
     this.stateMachine = new StateMachine<>(NodeState.HEALTHY_READONLY,
         finalStates);
@@ -194,10 +201,10 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * Populates state2event map.
    */
   private void initialiseState2EventMap() {
-    state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
-    state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
+    state2EventMap.put(STALE, SCMEvents.STALE_NODE);
+    state2EventMap.put(DEAD, SCMEvents.DEAD_NODE);
     state2EventMap
-        .put(NodeState.HEALTHY, SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE);
+        .put(HEALTHY, SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE);
     state2EventMap
         .put(NodeState.HEALTHY_READONLY,
             SCMEvents.NON_HEALTHY_TO_READONLY_HEALTHY_NODE);
@@ -289,38 +296,38 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   private void initializeStateMachine() {
     stateMachine.addTransition(
-        NodeState.HEALTHY_READONLY, NodeState.HEALTHY,
+        HEALTHY_READONLY, HEALTHY,
         NodeLifeCycleEvent.LAYOUT_MATCH);
     stateMachine.addTransition(
-        NodeState.HEALTHY_READONLY, NodeState.STALE,
+        HEALTHY_READONLY, STALE,
         NodeLifeCycleEvent.TIMEOUT);
     stateMachine.addTransition(
-        NodeState.HEALTHY_READONLY, NodeState.DECOMMISSIONING,
+        HEALTHY_READONLY, DECOMMISSIONING,
         NodeLifeCycleEvent.DECOMMISSION);
     stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
+        HEALTHY, STALE, NodeLifeCycleEvent.TIMEOUT);
     stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.HEALTHY_READONLY,
+        HEALTHY, HEALTHY_READONLY,
         NodeLifeCycleEvent.LAYOUT_MISMATCH);
     stateMachine.addTransition(
-        NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
+        STALE, DEAD, NodeLifeCycleEvent.TIMEOUT);
     stateMachine.addTransition(
-        NodeState.STALE, NodeState.HEALTHY_READONLY,
+        STALE, HEALTHY_READONLY,
         NodeLifeCycleEvent.RESTORE);
     stateMachine.addTransition(
-        NodeState.DEAD, NodeState.HEALTHY_READONLY,
+        DEAD, HEALTHY_READONLY,
         NodeLifeCycleEvent.RESURRECT);
     stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.DECOMMISSIONING,
+        HEALTHY, DECOMMISSIONING,
         NodeLifeCycleEvent.DECOMMISSION);
     stateMachine.addTransition(
-        NodeState.STALE, NodeState.DECOMMISSIONING,
+        STALE, DECOMMISSIONING,
         NodeLifeCycleEvent.DECOMMISSION);
     stateMachine.addTransition(
-        NodeState.DEAD, NodeState.DECOMMISSIONING,
+        DEAD, DECOMMISSIONING,
         NodeLifeCycleEvent.DECOMMISSION);
     stateMachine.addTransition(
-        NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
+        DECOMMISSIONING, DECOMMISSIONED,
         NodeLifeCycleEvent.DECOMMISSIONED);
 
   }
@@ -418,7 +425,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   public List<DatanodeInfo> getHealthyNodes() {
     List<DatanodeInfo> allHealthyNodes;
-    allHealthyNodes = getNodes(NodeState.HEALTHY);
+    allHealthyNodes = getNodes(HEALTHY);
     allHealthyNodes.addAll(getNodes(NodeState.HEALTHY_READONLY));
     return allHealthyNodes;
   }
@@ -429,7 +436,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return list of stale nodes
    */
   public List<DatanodeInfo> getStaleNodes() {
-    return getNodes(NodeState.STALE);
+    return getNodes(STALE);
   }
 
   /**
@@ -438,7 +445,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return list of dead nodes
    */
   public List<DatanodeInfo> getDeadNodes() {
-    return getNodes(NodeState.DEAD);
+    return getNodes(DEAD);
   }
 
   /**
@@ -500,7 +507,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return healthy node count
    */
   public int getHealthyNodeCount() {
-    return getNodeCount(NodeState.HEALTHY) +
+    return getNodeCount(HEALTHY) +
         getNodeCount(NodeState.HEALTHY_READONLY);
   }
 
@@ -510,7 +517,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return stale node count
    */
   public int getStaleNodeCount() {
-    return getNodeCount(NodeState.STALE);
+    return getNodeCount(STALE);
   }
 
   /**
@@ -519,7 +526,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return dead node count
    */
   public int getDeadNodeCount() {
-    return getNodeCount(NodeState.DEAD);
+    return getNodeCount(DEAD);
   }
 
   /**
@@ -629,6 +636,24 @@ public class NodeStateManager implements Runnable, 
Closeable {
     scheduleNextHealthCheck();
   }
 
+  public void forceNodesToHealthyReadOnly() {
+    try {
+      List<UUID> nodes = nodeStateMap.getNodes(HEALTHY);
+      for (UUID id : nodes) {
+        DatanodeInfo node = nodeStateMap.getNodeInfo(id);
+        nodeStateMap.updateNodeState(node.getUuid(), HEALTHY,
+            HEALTHY_READONLY);
+        if (state2EventMap.containsKey(HEALTHY_READONLY)) {
+          eventPublisher.fireEvent(state2EventMap.get(HEALTHY_READONLY),
+              node);
+        }
+      }
+    } catch (NodeNotFoundException ex) {
+      LOG.error("Inconsistent NodeStateMap! {}", nodeStateMap);
+      ex.printStackTrace();
+    }
+  }
+
   private void checkNodesHealth() {
 
     /*
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 86ef122..51c84dc 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -884,4 +884,10 @@ public class SCMNodeManager implements NodeManager {
   public HDDSLayoutVersionManager getLayoutVersionManager() {
     return scmLayoutVersionManager;
   }
+
+  @VisibleForTesting
+  @Override
+  public void forceNodesToHealthyReadOnly() {
+    nodeStateManager.forceNodesToHealthyReadOnly();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index f240293..18e974a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -46,15 +46,25 @@ class BackgroundPipelineCreator {
   private final PipelineManager pipelineManager;
   private final ConfigurationSource conf;
   private ScheduledFuture<?> periodicTask;
+  private AtomicBoolean pausePipelineCreation;
 
   BackgroundPipelineCreator(PipelineManager pipelineManager,
       Scheduler scheduler, ConfigurationSource conf) {
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.scheduler = scheduler;
+    this.pausePipelineCreation = new AtomicBoolean(false);
     isPipelineCreatorRunning = new AtomicBoolean(false);
   }
 
+  public void pause() {
+    pausePipelineCreation.set(true);
+  }
+
+  public void resume() {
+    pausePipelineCreation.set(false);
+  }
+
   private boolean shouldSchedulePipelineCreator() {
     return isPipelineCreatorRunning.compareAndSet(false, true);
   }
@@ -105,6 +115,10 @@ class BackgroundPipelineCreator {
 
   private void createPipelines() {
     // TODO: #CLUTIL Different replication factor may need to be supported
+
+    if(pausePipelineCreation.get()) {
+      return;
+    }
     HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
         conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
             OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 0cb905e..ed0b047 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -125,4 +125,8 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean,
    * @return boolean
    */
   boolean getSafeModeStatus();
+
+  void freezePipelineCreation();
+
+  void resumePipelineCreation();
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index e0ea885..91c49c3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -90,6 +90,10 @@ public class SCMPipelineManager implements PipelineManager {
   // to prevent pipelines being created until sufficient nodes have registered.
   private final AtomicBoolean pipelineCreationAllowed;
 
+  // This allows for freezing/resuming the new pipeline creation while the
+  // SCM is already out of SafeMode.
+  private AtomicBoolean freezePipelineCreation;
+
   public SCMPipelineManager(ConfigurationSource conf,
       NodeManager nodeManager,
       Table<PipelineID, Pipeline> pipelineStore,
@@ -134,6 +138,9 @@ public class SCMPipelineManager implements PipelineManager {
     // Pipeline creation is only allowed after the safemode prechecks have
     // passed, eg sufficient nodes have registered.
     this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
+    // controls freezing/resuming pipeline creation regardless of SafeMode
+    // status.
+    this.freezePipelineCreation = new AtomicBoolean(false);
   }
 
   public PipelineStateManager getStateManager() {
@@ -266,6 +273,12 @@ public class SCMPipelineManager implements PipelineManager 
{
       throw new IOException("Pipeline creation is not allowed as safe mode " +
           "prechecks have not yet passed");
     }
+    if (freezePipelineCreation.get()) {
+      LOG.debug("Pipeline creation is frozen while an upgrade is in " +
+          "progress");
+      throw new IOException("Pipeline creation is frozen while an upgrade " +
+          "is in progress");
+    }
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -749,6 +762,18 @@ public class SCMPipelineManager implements PipelineManager 
{
     return this.isInSafeMode.get();
   }
 
+  @Override
+  public void freezePipelineCreation() {
+    freezePipelineCreation.set(true);
+    backgroundPipelineCreator.pause();
+  }
+
+  @Override
+  public void resumePipelineCreation() {
+    freezePipelineCreation.set(false);
+    backgroundPipelineCreator.resume();
+  }
+
   public Table<PipelineID, Pipeline> getPipelineStore() {
     return pipelineStore;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 841426f..c0b4689 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
@@ -56,6 +57,7 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -80,6 +82,8 @@ import 
org.apache.hadoop.hdds.scm.node.NonHealthyToReadOnlyHealthyNodeHandler;
 import org.apache.hadoop.hdds.scm.node.ReadOnlyHealthyToHealthyNodeHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
@@ -119,6 +123,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.protobuf.BlockingService;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1157,4 +1163,106 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   public HDDSLayoutVersionManager getLayoutVersionManager() {
     return scmLayoutVersionManager;
   }
+
+  private void waitForAllContainersToClose() {
+    boolean containersFound = true;
+    while (containersFound) {
+      containersFound = false;
+      for (DatanodeDetails datanodeDetails : scmNodeManager.getAllNodes()) {
+        try {
+          for (ContainerID id : scmNodeManager.getContainers(datanodeDetails)) 
{
+            try {
+              final ContainerInfo container = 
containerManager.getContainer(id);
+              if (container.getState() == HddsProtos.LifeCycleState.OPEN ||
+                  container.getState() == HddsProtos.LifeCycleState.CLOSING) {
+                containersFound = true;
+                if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
+                  eventQueue.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
+                }
+              }
+            } catch (ContainerNotFoundException cnfe) {
+              LOG.warn("Container {} is not managed by ContainerManager.",
+                  id, cnfe);
+              continue;
+            }
+          }
+        } catch (NodeNotFoundException e) {
+          continue;
+        }
+      }
+      try {
+        if (containersFound) {
+          LOG.info("Waiting for all containers to close.");
+          Thread.sleep(5000);
+        }
+      } catch (InterruptedException e) {
+        continue;
+      }
+    }
+  }
+
+  private void waitForAllPipelinesToDestroy() throws IOException {
+    boolean pipelineFound = true;
+    while (pipelineFound) {
+      pipelineFound = false;
+      for (Pipeline pipeline : pipelineManager.getPipelines()) {
+        if (pipeline.getPipelineState() != CLOSED) {
+          pipelineFound = true;
+          pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+        }
+      }
+      try {
+        if (pipelineFound) {
+          LOG.info("Waiting for all pipelines to close.");
+          Thread.sleep(5000);
+        }
+      } catch (InterruptedException e) {
+        continue;
+      }
+    }
+  }
+
+  // This should be called in the context of a separate finalize upgrade 
thread.
+  // This function can block indefinitely till the conditions are met to safely
+  // finalize Upgrade.
+
+  public void preFinalizeUpgrade() throws IOException {
+    /**
+     * Ask pipeline manager to not create any new pipelines. Pipeline
+     * creation will remain frozen until postFinalizeUpgrade().
+     */
+    pipelineManager.freezePipelineCreation();
+
+    /**
+     * Ask all the existing data nodes to close any open containers and
+     * destroy existing pipelines
+     */
+    waitForAllPipelinesToDestroy();
+
+    /**
+     * We can not yet move all the existing data nodes to HEALTHY-READONLY
+     * state since the next heartbeat will move them back to HEALTHY state.
+     * This has to wait till postFinalizeUpgrade, when SCM MLV version is
+     * already upgraded as part of finalize processing.
+     * While in this state, it should be safe to do finalize processing for
+     * all new features. This will also update ondisk mlv version. Any
+     * disrupting upgrade can add a hook here to make sure that SCM is in a
+     * consistent state while finalizing the upgrade.
+     */
+  }
+
+  public void postFinalizeUpgrade() {
+    /**
+     * Don't wait for next heartbeat from datanodes in order to move them to
+     * Healthy-Readonly state. Force them to Healthy-ReadOnly state so that
+     * we can resume pipeline creation right away.
+     */
+    scmNodeManager.forceNodesToHealthyReadOnly();
+
+    /**
+     * Allow pipeline manager to create any new pipelines if it can
+     * find enough Healthy data nodes.
+     */
+    pipelineManager.resumePipelineCreation();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
deleted file mode 100644
index 88afad1..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.server.upgrade;
-
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
-
-/**
- * Example SCM Action class to help with understanding.
- */
-public class NewSCMFeatureUpgradeAction implements
-    HDDSUpgradeAction<StorageContainerManager> {
-
-  @Override
-  public void executeAction(StorageContainerManager scm) {
-    // Do blah....
-  }
-}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
index 809c3dd..8137756 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
 /**
  * Upgrade Action for StorageContainerManager which takes in an 'SCM' instance.
  */
-public interface SCMUpgradeAction<T> extends
+public class SCMUpgradeAction extends
     HDDSUpgradeAction<StorageContainerManager> {
+  public void executeAction(StorageContainerManager arg) throws Exception {
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
new file mode 100644
index 0000000..bf91ccd
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.upgrade;
+
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds
+    .upgrade.HDDSLayoutFeatureCatalog.HDDSLayoutFeature;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
+
+/**
+ * UpgradeFinalizer for the Storage Container Manager service.
+ */
+public class SCMUpgradeFinalizer extends
+    BasicUpgradeFinalizer<StorageContainerManager, HDDSLayoutVersionManager> {
+
+  public SCMUpgradeFinalizer(HDDSLayoutVersionManager versionManager) {
+    super(versionManager);
+  }
+
+  @Override
+  public StatusAndMessages finalize(String upgradeClientID,
+                                    StorageContainerManager scm)
+      throws IOException {
+    StatusAndMessages response = preFinalize(upgradeClientID, scm);
+    if (response.status() != FINALIZATION_REQUIRED) {
+      return response;
+    }
+    new Worker(scm).call();
+    return STARTING_MSG;
+  }
+
+  private class Worker implements Callable<Void> {
+    private StorageContainerManager storageContainerManager;
+
+    /**
+     * Initiates the Worker, for the specified SCM instance.
+     * @param scm the StorageContainerManager instance on which to finalize the
+     *           new LayoutFeatures.
+     */
+    Worker(StorageContainerManager scm) {
+      storageContainerManager = scm;
+    }
+
+    @Override
+    public Void call() throws IOException {
+      try {
+        emitStartingMsg();
+        versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
+        /*
+         * Before we can call finalize the feature, we need to make sure that
+         * all existing pipelines are closed and pipeline Manger would freeze
+         * all new pipeline creation.
+         */
+        storageContainerManager.preFinalizeUpgrade();
+
+        for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+          finalizeFeature(f, storageContainerManager.getScmStorageConfig());
+          updateLayoutVersionInVersionFile(f,
+              storageContainerManager.getScmStorageConfig());
+          versionManager.finalized(f);
+        }
+        versionManager.completeFinalization();
+        storageContainerManager.postFinalizeUpgrade();
+        emitFinishedMsg();
+        return null;
+      } finally {
+        versionManager.setUpgradeState(FINALIZATION_DONE);
+        isDone = true;
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
index 4882fcf..0850cf1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -18,90 +18,48 @@
 
 package org.apache.hadoop.ozone.om.upgrade;
 
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.*;
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
 
 /**
  * UpgradeFinalizer implementation for the Ozone Manager service.
  */
-public class OMUpgradeFinalizer implements UpgradeFinalizer<OzoneManager> {
-
+public class OMUpgradeFinalizer extends BasicUpgradeFinalizer<OzoneManager,
+    OMLayoutVersionManagerImpl> {
   private  static final OmUpgradeAction NOOP = a -> {};
 
-  private OMLayoutVersionManagerImpl versionManager;
-  private String clientID;
-
-  private Queue<String> msgs = new ConcurrentLinkedQueue<>();
-  private boolean isDone = false;
-
   public OMUpgradeFinalizer(OMLayoutVersionManagerImpl versionManager) {
-    this.versionManager = versionManager;
+    super(versionManager);
   }
 
   @Override
   public StatusAndMessages finalize(String upgradeClientID, OzoneManager om)
       throws IOException {
-    if (!versionManager.needsFinalization()) {
-      return FINALIZED_MSG;
-    }
-    clientID = upgradeClientID;
-
-// This requires some more investigation on how to do it properly while
-// requests are on the fly, and post finalize features one by one.
-// Until that is done, monitoring is not really doing anything meaningful
-// but this is a tradoff we can take for the first iteration either if needed,
-// as the finalization of the first few features should not take that long.
-// Follow up JIRA is in HDDS-4286
-//    String threadName = "OzoneManager-Upgrade-Finalizer";
-//    ExecutorService executor =
-//        Executors.newSingleThreadExecutor(r -> new Thread(threadName));
-//    executor.submit(new Worker(om));
+    StatusAndMessages response = preFinalize(upgradeClientID, om);
+    if (response.status() != FINALIZATION_REQUIRED) {
+      return response;
+    }
+    // This requires some more investigation on how to do it properly while
+    // requests are on the fly, and post finalize features one by one.
+    // Until that is done, monitoring is not really doing anything meaningful
+    // but this is a tradoff we can take for the first iteration either if
+    // needed, as the finalization of the first few features should not take
+    // that long. Follow up JIRA is in HDDS-4286
+    //    String threadName = "OzoneManager-Upgrade-Finalizer";
+    //    ExecutorService executor =
+    //        Executors.newSingleThreadExecutor(r -> new Thread(threadName));
+    //    executor.submit(new Worker(om));
     new Worker(om).call();
     return STARTING_MSG;
   }
 
-  @Override
-  public synchronized StatusAndMessages reportStatus(
-      String upgradeClientID, boolean takeover
-  ) throws IOException {
-    if (takeover) {
-      clientID = upgradeClientID;
-    }
-    assertClientId(upgradeClientID);
-    List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
-    Status status = isDone ? FINALIZATION_DONE : FINALIZATION_IN_PROGRESS;
-    while (msgs.size() > 0) {
-      returningMsgs.add(msgs.poll());
-    }
-    return new StatusAndMessages(status, returningMsgs);
-  }
-
-  private void assertClientId(String id) throws OMException {
-    if (!this.clientID.equals(id)) {
-      throw new OMException("Unknown client tries to get finalization 
status.\n"
-          + "The requestor is not the initiating client of the finalization,"
-          + " if you want to take over, and get unsent status messages, check"
-          + " -takeover option.", INVALID_REQUEST);
-    }
-  }
-
   /**
    * This class implements the finalization logic applied to every
    * LayoutFeature that needs to be finalized.
@@ -134,26 +92,28 @@ public class OMUpgradeFinalizer implements 
UpgradeFinalizer<OzoneManager> {
     }
 
     @Override
-    public Void call() throws OMException {
+    public Void call() throws IOException {
       try {
         emitStartingMsg();
+        versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
 
         for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
           finalizeFeature(f);
-          updateLayoutVersionInVersionFile(f);
+          updateLayoutVersionInVersionFile(f, ozoneManager.getOmStorage());
           versionManager.finalized(f);
         }
 
         versionManager.completeFinalization();
         emitFinishedMsg();
+        return null;
       } finally {
+        versionManager.setUpgradeState(FINALIZATION_DONE);
         isDone = true;
       }
-      return null;
     }
 
     private void finalizeFeature(OMLayoutFeature feature)
-        throws OMException {
+        throws IOException {
       OmUpgradeAction action = feature.onFinalizeAction().orElse(NOOP);
 
       if (action == NOOP) {
@@ -161,7 +121,7 @@ public class OMUpgradeFinalizer implements 
UpgradeFinalizer<OzoneManager> {
         return;
       }
 
-      putFinalizationMarkIntoVersionFile(feature);
+      putFinalizationMarkIntoVersionFile(feature, ozoneManager.getOmStorage());
 
       emitStartingFinalizationActionMsg(feature.name());
       try {
@@ -171,159 +131,8 @@ public class OMUpgradeFinalizer implements 
UpgradeFinalizer<OzoneManager> {
       }
       emitFinishFinalizationActionMsg(feature.name());
 
-      removeFinalizationMarkFromVersionFile(feature);
-    }
-
-    private void updateLayoutVersionInVersionFile(OMLayoutFeature feature)
-        throws OMException {
-      int prevLayoutVersion = currentStoredLayoutVersion();
-
-      updateStorageLayoutVersion(feature.layoutVersion());
-      try {
-        persistStorage();
-      } catch (IOException e) {
-        updateStorageLayoutVersion(prevLayoutVersion);
-        logLayoutVersionUpdateFailureAndThrow(e);
-      }
-    }
-
-    private void putFinalizationMarkIntoVersionFile(OMLayoutFeature feature)
-        throws OMException {
-      try {
-        emitUpgradeToLayoutVersionPersistingMsg(feature.name());
-
-        setUpgradeToLayoutVersionInStorage(feature.layoutVersion());
-        persistStorage();
-
-        emitUpgradeToLayoutVersionPersistedMsg();
-      } catch (IOException e) {
-        logUpgradeToLayoutVersionPersistingFailureAndThrow(feature.name(), e);
-      }
-    }
-
-    private void removeFinalizationMarkFromVersionFile(OMLayoutFeature feature)
-        throws OMException {
-      try {
-        emitRemovingUpgradeToLayoutVersionMsg(feature.name());
-
-        unsetUpgradeToLayoutVersionInStorage();
-        persistStorage();
-
-        emitRemovedUpgradeToLayoutVersionMsg();
-      } catch (IOException e) {
-        logUpgradeToLayoutVersionRemovalFailureAndThrow(feature.name(), e);
-      }
-    }
-
-
-
-
-
-    private void setUpgradeToLayoutVersionInStorage(int version) {
-      ozoneManager.getOmStorage().setUpgradeToLayoutVersion(version);
-    }
-
-    private void unsetUpgradeToLayoutVersionInStorage() {
-      ozoneManager.getOmStorage().unsetUpgradeToLayoutVersion();
-    }
-
-    private int currentStoredLayoutVersion() {
-      return ozoneManager.getOmStorage().getLayoutVersion();
-    }
-
-    private void updateStorageLayoutVersion(int version) {
-      ozoneManager.getOmStorage().setLayoutVersion(version);
-    }
-
-    private void persistStorage() throws IOException {
-      ozoneManager.getOmStorage().persistCurrentState();
-    }
-
-    private void emitNOOPMsg(String feature) {
-      String msg = "No finalization work defined for feature: " + feature + 
".";
-      String msg2 = "Skipping.";
-
-      logAndEmit(msg);
-      logAndEmit(msg2);
-    }
-
-    private void emitStartingMsg() {
-      String msg = "Finalization started.";
-      logAndEmit(msg);
-    }
-
-    private void emitFinishedMsg() {
-      String msg = "Finalization is done.";
-      logAndEmit(msg);
-    }
-
-    private void emitStartingFinalizationActionMsg(String feature) {
-      String msg = "Executing finalization of feature: " + feature + ".";
-      logAndEmit(msg);
-    }
-
-    private void emitFinishFinalizationActionMsg(String feature) {
-      String msg = "The feature " + feature + " is finalized.";
-      logAndEmit(msg);
-    }
-
-    private void emitUpgradeToLayoutVersionPersistingMsg(String feature) {
-      String msg = "Mark finalization of " + feature + " in VERSION file.";
-      logAndEmit(msg);
-    }
-
-    private void emitUpgradeToLayoutVersionPersistedMsg() {
-      String msg = "Finalization mark placed.";
-      logAndEmit(msg);
-    }
-
-    private void emitRemovingUpgradeToLayoutVersionMsg(String feature) {
-      String msg = "Remove finalization mark of " + feature
-          + " feature from VERSION file.";
-      logAndEmit(msg);
-    }
-
-    private void emitRemovedUpgradeToLayoutVersionMsg() {
-      String msg = "Finalization mark removed.";
-      logAndEmit(msg);
-    }
-
-    private void logAndEmit(String msg) {
-      LOG.info(msg);
-      msgs.offer(msg);
-    }
-
-    private void logFinalizationFailureAndThrow(Exception e, String feature)
-        throws OMException {
-      String msg = "Error during finalization of " + feature + ".";
-      logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
-    }
-
-    private void logLayoutVersionUpdateFailureAndThrow(IOException e)
-        throws OMException {
-      String msg = "Updating the LayoutVersion in the VERSION file failed.";
-      logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
-    }
-
-    private void logUpgradeToLayoutVersionPersistingFailureAndThrow(
-        String feature, IOException e
-    ) throws OMException {
-      String msg = "Failed to update VERSION file with the upgrading feature: "
-          + feature + ".";
-      logAndThrow(e, msg, PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED);
-    }
-
-    private void logUpgradeToLayoutVersionRemovalFailureAndThrow(
-        String feature, IOException e) throws OMException {
-      String msg =
-          "Failed to unmark finalization of " + feature + " LayoutFeature.";
-      logAndThrow(e, msg, REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED);
-    }
-
-    private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
-        throws OMException {
-      LOG.error(msg, e);
-      throw new OMException(msg, e, resultCode);
+      removeFinalizationMarkFromVersionFile(feature,
+          ozoneManager.getOmStorage());
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
index fbb7ce9..f185f8c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.om.upgrade;
 
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.junit.Rule;
@@ -38,9 +38,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -73,8 +74,8 @@ public class TestOMUpgradeFinalizer {
 
   @Test
   public void testEmitsFinalizedStatusIfAlreadyFinalized() throws Exception {
-    when(versionManager.needsFinalization()).thenReturn(false);
 
+    when(versionManager.getUpgradeState()).thenReturn(ALREADY_FINALIZED);
     OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
     StatusAndMessages ret = finalizer.finalize(CLIENT_ID, null);
 
@@ -109,6 +110,10 @@ public class TestOMUpgradeFinalizer {
     OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
     finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
 
+
+    if (finalizer.isFinalizationDone()) {
+      when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+    }
     StatusAndMessages ret = finalizer.reportStatus(CLIENT_ID, false);
 
     assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
@@ -123,6 +128,9 @@ public class TestOMUpgradeFinalizer {
     OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
     finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
 
+    if (finalizer.isFinalizationDone()) {
+      when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+    }
     StatusAndMessages ret = finalizer.reportStatus(OTHER_CLIENT_ID, true);
 
     assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
@@ -137,7 +145,7 @@ public class TestOMUpgradeFinalizer {
     OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
     finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
 
-    exception.expect(OMException.class);
+    exception.expect(UpgradeException.class);
     exception.expectMessage("Unknown client");
 
     finalizer.reportStatus(OTHER_CLIENT_ID, false);
@@ -176,6 +184,9 @@ public class TestOMUpgradeFinalizer {
     verify(om.getOmStorage(), once())
         .setLayoutVersion(f.layoutVersion());
 
+    if (finalizer.isFinalizationDone()) {
+      when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+    }
     StatusAndMessages status = finalizer.reportStatus(CLIENT_ID, false);
     assertEquals(FINALIZATION_DONE, status.status());
     assertFalse(status.msgs().isEmpty());
@@ -199,13 +210,16 @@ public class TestOMUpgradeFinalizer {
       finalizer.finalize(CLIENT_ID, om);
       fail();
     } catch (Exception e) {
-      assertThat(e, instanceOf(OMException.class));
+      assertThat(e, instanceOf(UpgradeException.class));
       assertThat(e.getMessage(), containsString(lfs.iterator().next().name()));
       assertEquals(
-          ((OMException) e).getResult(),
+          ((UpgradeException) e).getResult(),
           LAYOUT_FEATURE_FINALIZATION_FAILED
       );
     }
+    if (finalizer.isFinalizationDone()) {
+      when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+    }
 
     // Verify that we have never removed the upgradeToLV from the storage
     // as finalization of the first feature in the list fails.
@@ -240,6 +254,7 @@ public class TestOMUpgradeFinalizer {
   private void setupVersionManagerMockToFinalize(
       Iterable<OMLayoutFeature> lfs
   ) {
+    when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_REQUIRED);
     when(versionManager.needsFinalization()).thenReturn(true);
     when(versionManager.unfinalizedFeatures()).thenReturn(lfs);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to