This is an automated email from the ASF dual-hosted git repository.

errose28 pushed a commit to branch HDDS-14496-zdu
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-14496-zdu by this push:
     new e1e15cb3e60 HDDS-15195. Switch Datanode to the new versioning 
framework (#10215)
e1e15cb3e60 is described below

commit e1e15cb3e6094139e14b71bd670d1840cd38d2f1
Author: Ethan Rose <[email protected]>
AuthorDate: Fri May 15 12:56:12 2026 -0400

    HDDS-15195. Switch Datanode to the new versioning framework (#10215)
---
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   4 +-
 ...nodeLayoutStorage.java => DatanodeStorage.java} |  10 +-
 .../common/statemachine/DatanodeStateMachine.java  |  53 ++-----
 ...ler.java => FinalizeVersionCommandHandler.java} |  40 +++---
 .../states/endpoint/HeartbeatEndpointTask.java     |  46 ++-----
 .../states/endpoint/RegisterEndpointTask.java      |  33 +----
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   6 +-
 .../upgrade/DataNodeUpgradeFinalizer.java          |  53 -------
 .../upgrade/DatanodeUpgradeActionProvider.java     |  41 ++++++
 .../container/upgrade/DatanodeVersionManager.java  |  70 ++++++++++
 .../ozone/container/upgrade/UpgradeUtils.java      |   8 +-
 .../upgrade/VersionedDatanodeFeatures.java         |  16 +--
 ...ionCommand.java => FinalizeVersionCommand.java} |  28 ++--
 .../ozone/container/common/ContainerTestUtils.java |   4 +-
 .../container/common/TestDatanodeStateMachine.java |   2 +-
 .../common/helpers/TestDatanodeIdYaml.java         |  10 +-
 .../states/endpoint/TestHeartbeatEndpointTask.java |  36 ++---
 ...TestDatanodeStartupInvalidApparentVersion.java} |  64 ++++-----
 .../TestDatanodeUpgradeToContainerIdsTable.java    |   8 +-
 .../upgrade/TestDatanodeUpgradeToHBaseSupport.java |   8 +-
 .../upgrade/TestDatanodeUpgradeToSchemaV3.java     |  30 ++--
 .../upgrade/TestDatanodeVersionManager.java        | 152 +++++++++++----------
 .../ozone/container/upgrade/UpgradeTestHelper.java |  22 +--
 .../hadoop/hdds/upgrade/HDDSVersionManager.java    |   8 +-
 .../hdds/upgrade/TestHDDSVersionManager.java       |  87 ------------
 .../AbstractComponentVersionManagerTest.java       |  12 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  27 +++-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   4 +-
 .../hdds/scm/server/StorageContainerManager.java   |   2 -
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  52 +++----
 .../ozone/container/common/TestEndPoint.java       |  38 +++---
 .../hadoop/ozone/admin/upgrade/UpgradeUtil.java    |  41 ------
 .../datanode/schemaupgrade/UpgradeUtils.java       |   6 +-
 .../schemaupgrade/TestUpgradeContainerSchema.java  |   4 +-
 .../TestDNDataDistributionFinalization.java        |   4 +-
 .../hadoop/hdds/upgrade/TestHddsUpgradeUtils.java  |  84 ++----------
 .../hadoop/ozone/UniformDatanodesFactory.java      |   4 +-
 .../hadoop/ozone/om/upgrade/OMVersionManager.java  |   1 +
 .../ozone/om/upgrade/TestOMVersionManager.java     |   4 +
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   5 +-
 .../hadoop/ozone/freon/DatanodeSimulator.java      |  14 +-
 41 files changed, 485 insertions(+), 656 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index e94cbeeae21..c5665a16a9c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -84,7 +84,7 @@
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
@@ -287,7 +287,7 @@ public String getNamespace() {
         LOG.info("Hdds Datanode login successful.");
       }
 
-      DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+      DatanodeStorage layoutStorage = new DatanodeStorage(conf,
           datanodeDetails.getUuidString());
       if (layoutStorage.getState() != INITIALIZED) {
         layoutStorage.initialize();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeLayoutStorage.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeStorage.java
similarity index 92%
rename from 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeLayoutStorage.java
rename to 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeStorage.java
index 05dfe6ecc47..1b1be8df8b6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeLayoutStorage.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DatanodeStorage.java
@@ -35,25 +35,25 @@
  * DataNodeStorageConfig is responsible for management of the
  * StorageDirectories used by the DataNode.
  */
-public class DatanodeLayoutStorage extends Storage {
+public class DatanodeStorage extends Storage {
   /**
    * Construct DataNodeStorageConfig.
    * @throws IOException if any directories are inaccessible.
    */
-  public DatanodeLayoutStorage(ConfigurationSource conf, String dataNodeId)
+  public DatanodeStorage(ConfigurationSource conf, String dataNodeId)
       throws IOException {
     super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
         DATANODE_LAYOUT_VERSION_DIR, dataNodeId, 
getDefaultLayoutVersion(conf));
   }
 
-  public DatanodeLayoutStorage(OzoneConfiguration conf, String dataNodeId,
-                               int layoutVersion)
+  public DatanodeStorage(OzoneConfiguration conf, String dataNodeId,
+                         int layoutVersion)
       throws IOException {
     super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
         DATANODE_LAYOUT_VERSION_DIR, dataNodeId, layoutVersion);
   }
 
-  public DatanodeLayoutStorage(ConfigurationSource conf)
+  public DatanodeStorage(ConfigurationSource conf)
       throws IOException {
     super(NodeType.DATANODE, ServerUtils.getOzoneMetaDirPath(conf),
         DATANODE_LAYOUT_VERSION_DIR, getDefaultLayoutVersion(conf));
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 977df48a552..02c396c5350 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -43,8 +43,6 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeActionProvider;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.NettyMetrics;
@@ -52,7 +50,7 @@
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.HddsDatanodeStopService;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
@@ -61,7 +59,7 @@
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
-import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
+import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeVersionCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
@@ -82,11 +80,9 @@
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
 import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
-import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages;
-import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
@@ -117,9 +113,8 @@ public class DatanodeStateMachine implements Closeable {
 
   private final HddsDatanodeStopService hddsDatanodeStopService;
 
-  private final HDDSLayoutVersionManager layoutVersionManager;
-  private final DatanodeLayoutStorage layoutStorage;
-  private final DataNodeUpgradeFinalizer upgradeFinalizer;
+  private final DatanodeVersionManager versionManager;
+  private final DatanodeStorage storage;
 
   /**
    * Used to synchronize to the OzoneContainer object created in the
@@ -165,13 +160,11 @@ public DatanodeStateMachine(HddsDatanodeService 
hddsDatanodeService,
 
     Clock clock = Clock.system(ZoneId.systemDefault());
     // Expected to be initialized already.
-    layoutStorage = new DatanodeLayoutStorage(conf,
+    storage = new DatanodeStorage(conf,
         datanodeDetails.getUuidString());
 
-    layoutVersionManager = new HDDSLayoutVersionManager(
-        layoutStorage.getApparentVersion(), null, new 
DatanodeUpgradeActionProvider());
-    upgradeFinalizer = new DataNodeUpgradeFinalizer(layoutVersionManager);
-    VersionedDatanodeFeatures.initialize(layoutVersionManager);
+    versionManager = new DatanodeVersionManager(storage, this);
+    VersionedDatanodeFeatures.initialize(versionManager);
 
     String threadNamePrefix = datanodeDetails.threadNamePrefix();
     executorService = Executors.newFixedThreadPool(
@@ -277,7 +270,7 @@ public DatanodeStateMachine(HddsDatanodeService 
hddsDatanodeService,
             closePipelineCommandExecutorService))
         .addHandler(new CreatePipelineCommandHandler(conf,
             createPipelineCommandExecutorService))
-        .addHandler(new FinalizeNewLayoutVersionCommandHandler())
+        .addHandler(new FinalizeVersionCommandHandler())
         .addHandler(new RefreshVolumeUsageCommandHandler())
         .addHandler(new ReconcileContainerCommandHandler(supervisor, 
dnClient));
 
@@ -455,8 +448,8 @@ public void close() throws IOException {
     if (cmdProcessThread != null) {
       cmdProcessThread.interrupt();
     }
-    if (layoutVersionManager != null) {
-      layoutVersionManager.close();
+    if (versionManager != null) {
+      versionManager.close();
     }
     context.setState(DatanodeStates.getLastState());
     replicationSupervisorMetrics.unRegister();
@@ -748,29 +741,13 @@ public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
 
-  @VisibleForTesting
-  public HDDSLayoutVersionManager getLayoutVersionManager() {
-    return layoutVersionManager;
+  public DatanodeVersionManager getVersionManager() {
+    return versionManager;
   }
 
   @VisibleForTesting
-  public DatanodeLayoutStorage getLayoutStorage() {
-    return layoutStorage;
-  }
-
-  public StatusAndMessages finalizeUpgrade()
-      throws IOException {
-    return upgradeFinalizer.finalize(datanodeDetails.getUuidString(), this);
-  }
-
-  public StatusAndMessages queryUpgradeStatus()
-      throws IOException {
-    return upgradeFinalizer.reportStatus(datanodeDetails.getUuidString(),
-        true);
-  }
-
-  public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
-    return upgradeFinalizer;
+  public DatanodeStorage getStorage() {
+    return storage;
   }
 
   public ConfigurationSource getConf() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeVersionCommandHandler.java
similarity index 72%
rename from 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
rename to 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeVersionCommandHandler.java
index a27b94b76a3..a9df014b9ec 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeVersionCommandHandler.java
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_REQUIRED;
-
 import java.util.concurrent.atomic.AtomicLong;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.FinalizeNewLayoutVersionCommandProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -28,30 +26,32 @@
 import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Handler for FinalizeNewLayoutVersion command received from SCM.
+ * Handler for FinalizeVersion command received from SCM.
  */
-public class FinalizeNewLayoutVersionCommandHandler implements CommandHandler {
+public class FinalizeVersionCommandHandler implements CommandHandler {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(FinalizeNewLayoutVersionCommandHandler.class);
+      LoggerFactory.getLogger(FinalizeVersionCommandHandler.class);
 
-  private AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicLong invocationCount = new AtomicLong(0);
   private final MutableRate opsLatencyMs;
 
   /**
-   * Constructs a FinalizeNewLayoutVersionCommandHandler.
+   * Constructs a FinalizeVersionCommandHandler.
    */
-  public FinalizeNewLayoutVersionCommandHandler() {
+  public FinalizeVersionCommandHandler() {
     MetricsRegistry registry = new MetricsRegistry(
-        FinalizeNewLayoutVersionCommandHandler.class.getSimpleName());
-    this.opsLatencyMs = 
registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms");
+        FinalizeVersionCommandHandler.class.getSimpleName());
+    this.opsLatencyMs =
+        registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand 
+ "Ms");
   }
 
   /**
@@ -65,24 +65,20 @@ public FinalizeNewLayoutVersionCommandHandler() {
   @Override
   public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
+    LOG.info("Processing FinalizeVersionCommandHandler command.");
     invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     DatanodeStateMachine dsm = context.getParent();
     final FinalizeNewLayoutVersionCommandProto finalizeCommand =
-        ((FinalizeNewLayoutVersionCommand)command).getProto();
+        ((FinalizeVersionCommand) command).getProto();
     try {
       if (finalizeCommand.getFinalizeNewLayoutVersion()) {
-        // SCM is asking datanode to finalize
-        if (dsm.getLayoutVersionManager().getUpgradeState() ==
-            FINALIZATION_REQUIRED) {
-          // SCM will keep sending Finalize command until datanode mlv == slv
-          // we need to avoid multiple invocations of finalizeUpgrade.
-          LOG.info("Finalize Upgrade called!");
-          dsm.finalizeUpgrade();
+        if (dsm.getVersionManager().needsFinalization()) {
+          LOG.info("Finalize upgrade called.");
+          dsm.getVersionManager().finalizeUpgrade();
         }
       }
-    } catch (Exception e) {
+    } catch (UpgradeException e) {
       LOG.error("Exception during finalization.", e);
     } finally {
       long endTime = Time.monotonicNow();
@@ -107,7 +103,7 @@ public SCMCommandProto.Type getCommandType() {
    */
   @Override
   public int getInvocationCount() {
-    return (int)invocationCount.get();
+    return (int) invocationCount.get();
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 7cb24558c7c..c40fd317360 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -21,6 +21,7 @@
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_ACTION_MAX_LIMIT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
 import static 
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;
 
 import com.google.common.base.Preconditions;
@@ -43,18 +44,18 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import 
org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
@@ -76,7 +77,7 @@ public class HeartbeatEndpointTask
   private StateContext context;
   private int maxContainerActionsPerHB;
   private int maxPipelineActionsPerHB;
-  private HDDSLayoutVersionManager layoutVersionManager;
+  private final DatanodeVersionManager versionManager;
 
   /**
    * Constructs a SCM heart beat.
@@ -84,22 +85,16 @@ public class HeartbeatEndpointTask
    * @param rpcEndpoint rpc Endpoint
    * @param conf Config.
    * @param context State context
-   * @param versionManager Layout version Manager
    */
   public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
-                               ConfigurationSource conf, StateContext context,
-                               HDDSLayoutVersionManager versionManager) {
+                               ConfigurationSource conf, StateContext context) 
{
     this.rpcEndpoint = rpcEndpoint;
     this.context = context;
     this.maxContainerActionsPerHB = 
conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
         HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
     this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
         HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
-    if (versionManager != null) {
-      this.layoutVersionManager = versionManager;
-    } else {
-      this.layoutVersionManager = 
context.getParent().getLayoutVersionManager();
-    }
+    this.versionManager = context.getParent().getVersionManager();
   }
 
   /**
@@ -135,8 +130,8 @@ public EndpointStateMachine.EndPointStates call() throws 
Exception {
       Preconditions.checkState(this.datanodeDetailsProto != null);
 
       LayoutVersionProto layoutinfo = toLayoutVersionProto(
-          layoutVersionManager.getMetadataLayoutVersion(),
-          layoutVersionManager.getSoftwareLayoutVersion());
+          versionManager.getApparentVersion().serialize(),
+          versionManager.getSoftwareVersion().serialize());
 
       requestBuilder = SCMHeartbeatRequestProto.newBuilder()
           .setDatanodeDetails(datanodeDetailsProto)
@@ -370,15 +365,12 @@ private void processResponse(SCMHeartbeatResponseProto 
response,
             setNodeOperationalStateCommand);
         break;
       case finalizeNewLayoutVersionCommand:
-        FinalizeNewLayoutVersionCommand finalizeNewLayoutVersionCommand =
-            FinalizeNewLayoutVersionCommand.getFromProtobuf(
-                
commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
+        FinalizeVersionCommand finalizeVersionCommand =
+            
FinalizeVersionCommand.getFromProtobuf(commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Received SCM finalize command {}",
-              finalizeNewLayoutVersionCommand.getId());
+          LOG.debug("Received SCM finalize command {}", 
finalizeVersionCommand.getId());
         }
-        processCommonCommand(commandResponseProto,
-            finalizeNewLayoutVersionCommand);
+        processCommonCommand(commandResponseProto, finalizeVersionCommand);
         break;
       case refreshVolumeUsageInfo:
         RefreshVolumeUsageCommand refreshVolumeUsageCommand =
@@ -444,7 +436,6 @@ public static class Builder {
     private ConfigurationSource conf;
     private DatanodeDetails datanodeDetails;
     private StateContext context;
-    private HDDSLayoutVersionManager versionManager;
 
     /**
      * Constructs the builder class.
@@ -463,17 +454,6 @@ public Builder 
setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
       return this;
     }
 
-    /**
-     * Sets the LayoutVersionManager.
-     *
-     * @param lvm config
-     * @return Builder
-     */
-    public Builder setLayoutVersionManager(HDDSLayoutVersionManager lvm) {
-      this.versionManager = lvm;
-      return this;
-    }
-
     /**
      * Sets the Config.
      *
@@ -526,7 +506,7 @@ public HeartbeatEndpointTask build() {
       }
 
       HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
-          .endPointStateMachine, this.conf, this.context, this.versionManager);
+          .endPointStateMachine, this.conf, this.context);
       task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
       return task;
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 9055ee15757..0492f5da79b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -30,10 +30,10 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public final class RegisterEndpointTask implements
   private DatanodeDetails datanodeDetails;
   private final OzoneContainer datanodeContainerManager;
   private StateContext stateContext;
-  private HDDSLayoutVersionManager layoutVersionManager;
+  private final DatanodeVersionManager versionManager;
 
   /**
    * Creates a register endpoint task.
@@ -57,21 +57,15 @@ public final class RegisterEndpointTask implements
    * @param rpcEndPoint - endpoint
    * @param ozoneContainer - container
    * @param context - State context
-   * @param versionManager - layout version Manager
    */
   @VisibleForTesting
   public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
       OzoneContainer ozoneContainer,
-      StateContext context, HDDSLayoutVersionManager versionManager) {
+      StateContext context) {
     this.rpcEndPoint = rpcEndPoint;
     this.datanodeContainerManager = ozoneContainer;
     this.stateContext = context;
-    if (versionManager != null) {
-      this.layoutVersionManager = versionManager;
-    } else {
-      this.layoutVersionManager =
-          context.getParent().getLayoutVersionManager();
-    }
+    this.versionManager = context.getParent().getVersionManager();
   }
 
   /**
@@ -115,9 +109,9 @@ public EndpointStateMachine.EndPointStates call() throws 
Exception {
           .equals(EndpointStateMachine.EndPointStates.REGISTER)) {
         LayoutVersionProto layoutInfo = LayoutVersionProto.newBuilder()
             .setMetadataLayoutVersion(
-                layoutVersionManager.getMetadataLayoutVersion())
+                versionManager.getApparentVersion().serialize())
             .setSoftwareLayoutVersion(
-                layoutVersionManager.getSoftwareLayoutVersion())
+                versionManager.getSoftwareVersion().serialize())
             .build();
         ContainerReportsProto containerReport =
             datanodeContainerManager.getController().getContainerReport();
@@ -177,7 +171,6 @@ public static class Builder {
     private DatanodeDetails datanodeDetails;
     private OzoneContainer container;
     private StateContext context;
-    private HDDSLayoutVersionManager versionManager;
 
     /**
      * Constructs the builder class.
@@ -207,17 +200,6 @@ public Builder setConfig(ConfigurationSource config) {
       return this;
     }
 
-    /**
-     * Sets the LayoutVersionManager.
-     *
-     * @param lvm config
-     * @return Builder.
-     */
-    public Builder setLayoutVersionManager(HDDSLayoutVersionManager lvm) {
-      this.versionManager = lvm;
-      return this;
-    }
-
     /**
      * Sets the NodeID.
      *
@@ -277,8 +259,7 @@ public RegisterEndpointTask build() {
       }
 
       RegisterEndpointTask task = new RegisterEndpointTask(this
-          .endPointStateMachine, this.container, this.context,
-          this.versionManager);
+          .endPointStateMachine, this.container, this.context);
       task.setDatanodeDetails(datanodeDetails);
       return task;
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 27702064cf9..ca3a006bdda 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -69,7 +69,7 @@
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -542,8 +542,8 @@ public void start(String clusterId) throws IOException {
       return;
     }
 
-    DatanodeLayoutStorage layoutStorage
-        = new DatanodeLayoutStorage(config);
+    DatanodeStorage layoutStorage
+        = new DatanodeStorage(config);
     layoutStorage.setClusterId(clusterId);
     layoutStorage.persistCurrentState();
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
deleted file mode 100644
index 7fd352ac0f2..00000000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.upgrade;
-
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
-import org.apache.hadoop.ozone.upgrade.LayoutFeature;
-import org.apache.hadoop.ozone.upgrade.UpgradeException;
-
-/**
- * UpgradeFinalizer for the DataNode.
- */
-public class DataNodeUpgradeFinalizer extends
-    BasicUpgradeFinalizer<DatanodeStateMachine, HDDSLayoutVersionManager> {
-
-  public DataNodeUpgradeFinalizer(HDDSLayoutVersionManager versionManager) {
-    super(versionManager);
-  }
-
-  @Override
-  public void finalizeLayoutFeature(LayoutFeature layoutFeature,
-      DatanodeStateMachine dsm) throws UpgradeException {
-    if (layoutFeature instanceof HDDSLayoutFeature) {
-      HDDSLayoutFeature hddslayoutFeature =  (HDDSLayoutFeature)layoutFeature;
-      super.finalizeLayoutFeature(hddslayoutFeature,
-          hddslayoutFeature
-              .datanodeAction(),
-          dsm.getLayoutStorage());
-    } else {
-      String msg = String.format("Failed to finalize datanode layout feature " 
+
-          "%s. It is not an HDDS Layout Feature.", layoutFeature);
-      throw new UpgradeException(msg,
-          UpgradeException.ResultCodes.UPGRADE_FINALIZATION_FAILED);
-    }
-  }
-}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeUpgradeActionProvider.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeUpgradeActionProvider.java
new file mode 100644
index 00000000000..5f280b2887f
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeUpgradeActionProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.ComponentVersion;
+import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeAction;
+import org.apache.hadoop.ozone.upgrade.AbstractUpgradeActionProvider;
+import org.apache.hadoop.ozone.upgrade.UpgradeActionDatanode;
+
+/**
+ * Loads {@link DatanodeUpgradeAction} implementations annotated with {@link 
UpgradeActionDatanode}.
+ */
+public final class DatanodeUpgradeActionProvider extends 
AbstractUpgradeActionProvider<DatanodeUpgradeAction> {
+
+  public static final String DATANODE_UPGRADE_CLASS_PACKAGE = 
"org.apache.hadoop.ozone.container";
+
+  public DatanodeUpgradeActionProvider() {
+    super(UpgradeActionDatanode.class, DatanodeUpgradeAction.class, 
DATANODE_UPGRADE_CLASS_PACKAGE);
+  }
+
+  @Override
+  protected ComponentVersion extractVersion(Class<?> clazz) {
+    UpgradeActionDatanode annotation = 
clazz.getAnnotation(UpgradeActionDatanode.class);
+    return annotation.feature();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeVersionManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeVersionManager.java
new file mode 100644
index 00000000000..17f92c19bcb
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeVersionManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.hdds.ComponentVersion;
+import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeAction;
+import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeActionProvider;
+import org.apache.hadoop.hdds.upgrade.HDDSVersionManager;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.upgrade.ComponentUpgradeActionProvider;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
+
+/**
+ * Datanode-specific version manager that wires upgrade actions internally.
+ */
+public class DatanodeVersionManager extends HDDSVersionManager {
+
+  private final Map<ComponentVersion, DatanodeUpgradeAction> upgradeActions;
+  private final DatanodeStateMachine upgradeActionArg;
+
+  public DatanodeVersionManager(DatanodeStorage storage, DatanodeStateMachine 
upgradeActionArg) throws IOException {
+    this(storage, upgradeActionArg, new DatanodeUpgradeActionProvider());
+  }
+
+  @VisibleForTesting
+  public DatanodeVersionManager(DatanodeStorage storage, DatanodeStateMachine 
upgradeActionArg,
+      ComponentUpgradeActionProvider<DatanodeUpgradeAction> 
upgradeActionProvider) throws IOException {
+    super(storage);
+    this.upgradeActionArg = upgradeActionArg;
+    upgradeActions = upgradeActionProvider.load();
+  }
+
+  @VisibleForTesting
+  public Map<ComponentVersion, DatanodeUpgradeAction> 
getUpgradeActionsForTesting() {
+    return upgradeActions;
+  }
+
+  @Override
+  protected void runUpgradeAction(ComponentVersion version) throws 
UpgradeException {
+    DatanodeUpgradeAction action = upgradeActions.get(version);
+    if (action == null) {
+      return;
+    }
+    try {
+      action.execute(upgradeActionArg);
+    } catch (Exception e) {
+      logAndThrow(e, "Datanode upgrade action for version " + version + " 
failed.",
+          UpgradeException.ResultCodes.FINALIZE_UPGRADE_ACTION_FAILED);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/UpgradeUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/UpgradeUtils.java
index 5a1a2916bd2..d96aab53b5e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/UpgradeUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/UpgradeUtils.java
@@ -17,8 +17,7 @@
 
 package org.apache.hadoop.ozone.container.upgrade;
 
-import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
-
+import org.apache.hadoop.hdds.HDDSVersion;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 
 /**
@@ -30,9 +29,10 @@ private UpgradeUtils() {
   }
 
   public static LayoutVersionProto defaultLayoutVersionProto() {
+    int softwareVersion = HDDSVersion.SOFTWARE_VERSION.serialize();
     return LayoutVersionProto.newBuilder()
-        .setMetadataLayoutVersion(maxLayoutVersion())
-        .setSoftwareLayoutVersion(maxLayoutVersion()).build();
+        .setMetadataLayoutVersion(softwareVersion)
+        .setSoftwareLayoutVersion(softwareVersion).build();
   }
 
   public static LayoutVersionProto toLayoutVersionProto(int mLv, int sLv) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
index acaabdad6e4..65bd73b874c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/VersionedDatanodeFeatures.java
@@ -27,24 +27,22 @@
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 
 /**
  * Utility class to retrieve the version of a feature that corresponds to the
- * metadata layout version specified by the provided
- * {@link HDDSLayoutVersionManager}.
+ * apparent version specified by the provided
+ * {@link DatanodeVersionManager}.
  */
 public final class VersionedDatanodeFeatures {
-  private static HDDSLayoutVersionManager versionManager;
+  private static DatanodeVersionManager versionManager;
 
   private VersionedDatanodeFeatures() { }
 
-  public static void initialize(
-      HDDSLayoutVersionManager manager) {
+  public static void initialize(DatanodeVersionManager manager) {
     versionManager = manager;
   }
 
@@ -181,14 +179,14 @@ public static class DatanodePorts {
           HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS);
     }
 
-    private static boolean shouldPersistPort(DatanodeDetails.Port port, 
DatanodeLayoutStorage datanodeLayoutStorage) {
+    private static boolean shouldPersistPort(DatanodeDetails.Port port, 
DatanodeStorage datanodeLayoutStorage) {
       HDDSLayoutFeature portVersion = PORT_TO_VERSION.get(port.getName());
       return portVersion == null || 
portVersion.isSupportedBy(datanodeLayoutStorage.getApparentVersion());
     }
 
     public static Map<String, Integer> getPortsToPersist(DatanodeDetails 
datanodeDetails, ConfigurationSource conf)
         throws IOException {
-      DatanodeLayoutStorage datanodeLayoutStorage = new 
DatanodeLayoutStorage(conf, datanodeDetails.getUuidString());
+      DatanodeStorage datanodeLayoutStorage = new DatanodeStorage(conf, 
datanodeDetails.getUuidString());
       Map<String, Integer> portDetails = new LinkedHashMap<>();
 
       final List<DatanodeDetails.Port> ports = datanodeDetails.getPorts();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeVersionCommand.java
similarity index 77%
rename from 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
rename to 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeVersionCommand.java
index 4d284171b20..181a6f6b1a6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeVersionCommand.java
@@ -23,27 +23,27 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 
 /**
- * Asks DataNode to Finalize new upgrade version.
+ * Asks DataNode to finalize new upgrade version.
  */
-public class FinalizeNewLayoutVersionCommand
+public class FinalizeVersionCommand
     extends SCMCommand<FinalizeNewLayoutVersionCommandProto> {
 
   private boolean finalizeUpgrade = false;
-  private LayoutVersionProto layoutInfo;
+  private LayoutVersionProto versionInfo;
 
-  public FinalizeNewLayoutVersionCommand(boolean finalizeNewLayoutVersion,
-                                         LayoutVersionProto layoutInfo,
-                                         long id) {
+  public FinalizeVersionCommand(boolean finalizeNewLayoutVersion,
+                                LayoutVersionProto versionInfo,
+                                long id) {
     super(id);
     finalizeUpgrade = finalizeNewLayoutVersion;
-    this.layoutInfo = layoutInfo;
+    this.versionInfo = versionInfo;
   }
 
-  public FinalizeNewLayoutVersionCommand(boolean finalizeNewLayoutVersion,
-                                         LayoutVersionProto layoutInfo) {
+  public FinalizeVersionCommand(boolean finalizeNewLayoutVersion,
+                                LayoutVersionProto versionInfo) {
     super();
     finalizeUpgrade = finalizeNewLayoutVersion;
-    this.layoutInfo = layoutInfo;
+    this.versionInfo = versionInfo;
   }
 
   /**
@@ -61,14 +61,14 @@ public FinalizeNewLayoutVersionCommandProto getProto() {
     return FinalizeNewLayoutVersionCommandProto.newBuilder()
         .setFinalizeNewLayoutVersion(finalizeUpgrade)
         .setCmdId(getId())
-        .setDataNodeLayoutVersion(layoutInfo)
+        .setDataNodeLayoutVersion(versionInfo)
         .build();
   }
 
-  public static  FinalizeNewLayoutVersionCommand getFromProtobuf(
+  public static FinalizeVersionCommand getFromProtobuf(
       FinalizeNewLayoutVersionCommandProto finalizeProto) {
     Objects.requireNonNull(finalizeProto, "finalizeProto == null");
-    return new FinalizeNewLayoutVersionCommand(
+    return new FinalizeVersionCommand(
         finalizeProto.getFinalizeNewLayoutVersion(),
         finalizeProto.getDataNodeLayoutVersion(), finalizeProto.getCmdId());
   }
@@ -82,7 +82,7 @@ public String toString() {
         .append(", term: ").append(getTerm())
         .append(", deadlineMsSinceEpoch: ").append(getDeadline())
         .append(", finalizeUpgrade: ").append(finalizeUpgrade)
-        .append(", layoutInfo: ").append(layoutInfo);
+        .append(", versionInfo: ").append(versionInfo);
     return sb.toString();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index bdef642204f..69693b6a656 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -443,10 +443,10 @@ public static XceiverServerRatis newXceiverServerRatis(
         null, null);
   }
 
-  /** Initialize {@link DatanodeLayoutStorage}.  Normally this is done during 
{@link HddsDatanodeService} start,
+  /** Initialize {@link DatanodeStorage}.  Normally this is done during {@link 
HddsDatanodeService} start,
    * have to do the same for tests that create {@link OzoneContainer} 
manually. */
   public static void initializeDatanodeLayout(ConfigurationSource conf, 
DatanodeDetails dn) throws IOException {
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf, 
dn.getUuidString());
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf, 
dn.getUuidString());
     if (layoutStorage.getState() != INITIALIZED) {
       layoutStorage.initialize();
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 721f50bec13..1019fae622b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -229,7 +229,7 @@ public void testDatanodeStateContext() throws IOException,
       task = stateMachine.getContext().getTask();
       assertEquals(RunningDatanodeState.class, task.getClass());
 
-      DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+      DatanodeStorage layoutStorage = new DatanodeStorage(conf,
           UUID.randomUUID().toString(),
           HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion());
       layoutStorage.initialize();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeIdYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeIdYaml.java
index 3cde9207bd1..334e08ad117 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeIdYaml.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeIdYaml.java
@@ -29,7 +29,7 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -60,7 +60,7 @@ void 
testWriteReadBeforeRatisDatastreamPortLayoutVersion(@TempDir File dir)
     File file = new File(dir, "datanode.yaml");
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion());
     layoutStorage.initialize();
@@ -83,7 +83,7 @@ void 
testWriteReadAfterRatisDatastreamPortLayoutVersion(@TempDir File dir)
     File file = new File(dir, "datanode.yaml");
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.RATIS_DATASTREAM_PORT_IN_DATANODEDETAILS
             .layoutVersion());
@@ -104,7 +104,7 @@ void testWriteReadBeforeWebUIPortLayoutVersion(@TempDir 
File dir)
     File file = new File(dir, "datanode.yaml");
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion());
     layoutStorage.initialize();
@@ -125,7 +125,7 @@ void testWriteReadAfterWebUIPortLayoutVersion(@TempDir File 
dir)
     File file = new File(dir, "datanode.yaml");
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.WEBUI_PORTS_IN_DATANODEDETAILS.layoutVersion());
     layoutStorage.initialize();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index e2b4fa167ea..4e9ab80cb13 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -20,7 +20,6 @@
 import static java.util.Collections.emptyList;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
-import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -37,6 +36,7 @@
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -50,12 +50,12 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
@@ -97,7 +97,7 @@ public void handlesReconstructContainerCommand() throws 
Exception {
 
     OzoneConfiguration conf = new OzoneConfiguration();
     DatanodeStateMachine datanodeStateMachine =
-        mock(DatanodeStateMachine.class);
+        mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -136,7 +136,7 @@ public void testHandlesReconcileContainerCommand() throws 
Exception {
                 .build());
 
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -170,7 +170,7 @@ public void testheartbeatWithoutReports() throws Exception {
                 .build());
 
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -194,7 +194,7 @@ public void testheartbeatWithoutReports() throws Exception {
   @Test
   public void testheartbeatWithNodeReports() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -229,7 +229,7 @@ public void testheartbeatWithNodeReports() throws Exception 
{
   @Test
   public void testheartbeatWithContainerReports() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -265,7 +265,7 @@ public void testheartbeatWithContainerReports() throws 
Exception {
   @Test
   public void testheartbeatWithCommandStatusReports() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -302,7 +302,7 @@ public void testheartbeatWithCommandStatusReports() throws 
Exception {
   @Test
   public void testheartbeatWithContainerActions() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    DatanodeStateMachine datanodeStateMachine = mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -339,7 +339,7 @@ public void testheartbeatWithContainerActions() throws 
Exception {
   public void testheartbeatWithAllReports() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
     DatanodeStateMachine datanodeStateMachine =
-        mock(DatanodeStateMachine.class);
+        mockDatanodeStateMachine();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -419,20 +419,22 @@ private HeartbeatEndpointTask getHeartbeatEndpointTask(
     when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
     when(endpointStateMachine.getAddress())
         .thenReturn(TEST_SCM_ENDPOINT);
-    HDDSLayoutVersionManager layoutVersionManager =
-        mock(HDDSLayoutVersionManager.class);
-    when(layoutVersionManager.getSoftwareLayoutVersion())
-        .thenReturn(maxLayoutVersion());
-    when(layoutVersionManager.getMetadataLayoutVersion())
-        .thenReturn(maxLayoutVersion());
     return HeartbeatEndpointTask.newBuilder()
         .setConfig(conf)
         .setDatanodeDetails(datanodeDetails)
         .setContext(context)
-        .setLayoutVersionManager(layoutVersionManager)
         .setEndpointStateMachine(endpointStateMachine)
         .build();
   }
+  
+  private DatanodeStateMachine mockDatanodeStateMachine() {
+    DatanodeVersionManager versionManager = mock(DatanodeVersionManager.class);
+    
when(versionManager.getSoftwareVersion()).thenReturn(HDDSVersion.SOFTWARE_VERSION);
+    
when(versionManager.getApparentVersion()).thenReturn(HDDSVersion.SOFTWARE_VERSION);
+    DatanodeStateMachine mockDSM = mock(DatanodeStateMachine.class);
+    when(mockDSM.getVersionManager()).thenReturn(versionManager);
+    return mockDSM;
+  }
 
   private ContainerAction getContainerAction() {
     ContainerAction.Builder builder = ContainerAction.newBuilder();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeStartupInvalidApparentVersion.java
similarity index 53%
rename from 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
rename to 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeStartupInvalidApparentVersion.java
index f5681e958be..ccc94d9aa2c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeStartupInvalidApparentVersion.java
@@ -17,70 +17,64 @@
 
 package org.apache.hadoop.ozone.container.upgrade;
 
-import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
+import static org.apache.hadoop.hdds.HDDSVersion.SOFTWARE_VERSION;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.ozone.OzoneConsts.DATANODE_LAYOUT_VERSION_DIR;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.UUID;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.upgrade.UpgradeTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 /**
- * Tests that DataNode will throw an exception on creation when it reads in a
- * VERSION file indicating a metadata layout version larger than its
- * software layout version.
+ * Ensures Datanode does not start when the VERSION file records an apparent 
version newer than
+ * {@link org.apache.hadoop.hdds.HDDSVersion#SOFTWARE_VERSION}.
  */
-public class TestDataNodeStartupSlvLessThanMlv {
+public class TestDatanodeStartupInvalidApparentVersion {
   @TempDir
   private Path tempFolder;
 
   @Test
-  public void testStartupSlvLessThanMlv() throws Exception {
-    // Add subdirectories under the temporary folder where the version file
-    // will be placed.
+  public void 
testStartupFailsWhenApparentVersionBetweenLastLayoutFeatureAndZdu()
+      throws Exception {
+    assertStartupFailsWithComponentVersionMessage(
+        HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION.layoutVersion() + 1);
+  }
+
+  @Test
+  public void 
testStartupFailsWhenApparentVersionBeyondLastKnownComponentVersion()
+      throws Exception {
+    assertStartupFailsWithComponentVersionMessage(SOFTWARE_VERSION.serialize() 
+ 1);
+  }
+
+  private void assertStartupFailsWithComponentVersionMessage(int 
serializedApparentVersion)
+      throws Exception {
     File datanodeSubdir = Files.createDirectory(
         tempFolder.resolve(DATANODE_LAYOUT_VERSION_DIR)).toFile();
 
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
 
-    // Set metadata layout version larger then software layout version.
-    int largestSlv = maxLayoutVersion();
-    int mlv = largestSlv + 1;
-
-    // Create version file with MLV > SLV, which should fail the
-    // DataNodeStateMachine construction.
     UpgradeTestUtils.createVersionFile(datanodeSubdir,
-        HddsProtos.NodeType.DATANODE, mlv);
+        HddsProtos.NodeType.DATANODE, serializedApparentVersion);
 
-    IOException ioException = assertThrows(IOException.class,
-        () -> new DatanodeStateMachine(getNewDatanodeDetails(), conf));
-    assertThat(ioException).hasMessageEndingWith(
-        String.format("Metadata layout version (%s) > software layout version 
(%s)", mlv, largestSlv));
-  }
+    String expectedMessage =
+        "Initialization failed. Disk contains unknown apparent version " + 
serializedApparentVersion
+            + " for software version " + SOFTWARE_VERSION + ". Make sure this 
component was not downgraded after"
+            + " finalization";
 
-  private DatanodeDetails getNewDatanodeDetails() {
-    DatanodeDetails.Port containerPort = DatanodeDetails.newStandalonePort(0);
-    DatanodeDetails.Port ratisPort = DatanodeDetails.newRatisPort(0);
-    DatanodeDetails.Port restPort = DatanodeDetails.newRestPort(0);
-    return DatanodeDetails.newBuilder()
-        .setUuid(UUID.randomUUID())
-        .setHostName("localhost")
-        .setIpAddress("127.0.0.1")
-        .addPort(containerPort)
-        .addPort(ratisPort)
-        .addPort(restPort)
-        .build();
+    IOException ioException = assertThrows(IOException.class,
+        () -> new DatanodeStateMachine(randomDatanodeDetails(), conf));
+    assertEquals(expectedMessage, ioException.getMessage());
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToContainerIdsTable.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToContainerIdsTable.java
index 5fb5ba1edd6..86deef6fae0 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToContainerIdsTable.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToContainerIdsTable.java
@@ -104,8 +104,8 @@ public void testContainerTableAccessBeforeAndAfterUpgrade() 
throws Exception {
     // close container to allow upgrade.
     UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
-    dsm.finalizeUpgrade();
-    
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.WITNESSED_CONTAINER_DB_PROTO_VALUE));
+    dsm.getVersionManager().finalizeUpgrade();
+    
assertTrue(dsm.getVersionManager().isAllowed(HDDSLayoutFeature.WITNESSED_CONTAINER_DB_PROTO_VALUE));
     
assertEquals(WitnessedContainerDBDefinition.CONTAINER_CREATE_INFO_TABLE_DEF.getName(),
         metadataStore.getContainerCreateInfoTable().getName());
     ContainerCreateInfo containerCreateInfo = 
metadataStore.getContainerCreateInfoTable().get(
@@ -162,8 +162,8 @@ public void testContainerTableFinalizeRetry() throws 
Exception {
     }
 
     // trigger another upgrade which will update metainfo for upgrade
-    dsm.finalizeUpgrade();
-    
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.WITNESSED_CONTAINER_DB_PROTO_VALUE));
+    dsm.getVersionManager().finalizeUpgrade();
+    
assertTrue(dsm.getVersionManager().isAllowed(HDDSLayoutFeature.WITNESSED_CONTAINER_DB_PROTO_VALUE));
     
assertEquals(WitnessedContainerDBDefinition.CONTAINER_CREATE_INFO_TABLE_DEF.getName(),
         metadataStore.getContainerCreateInfoTable().getName());
     ContainerCreateInfo containerCreateInfo
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
index 214ea1d167d..11bec6a5dd1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
@@ -97,8 +97,8 @@ public void testIncrementalChunkListBeforeAndAfterUpgrade() 
throws Exception {
     // close container to allow upgrade.
     UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
-    dsm.finalizeUpgrade();
-    
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
+    dsm.getVersionManager().finalizeUpgrade();
+    
assertTrue(dsm.getVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
     // open a new container after finalization
     final long containerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     // incremental chunk list should work after finalizing.
@@ -134,8 +134,8 @@ public void testBlockFinalizationBeforeAndAfterUpgrade() 
throws Exception {
     // close container to allow upgrade.
     UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
-    dsm.finalizeUpgrade();
-    
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
+    dsm.getVersionManager().finalizeUpgrade();
+    
assertTrue(dsm.getVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
     final long containerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     ContainerProtos.WriteChunkRequestProto writeChunk2 =
         UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline, false);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
index 1b8470d8a2c..3747514a95c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -50,7 +50,7 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -127,7 +127,7 @@ public void testDBOnHddsVolume(boolean schemaV3Enabled) 
throws Exception {
     assertNull(dataVolume.getDbVolume());
     assertFalse(dataVolume.isDbLoaded());
 
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
     // RocksDB is created during upgrade
     File dbFile = new File(dataVolume.getStorageDir().getAbsolutePath() + "/" +
         dataVolume.getClusterID() + "/" + dataVolume.getStorageID());
@@ -164,7 +164,7 @@ public void testDBOnDbVolume(boolean schemaV3Enabled) 
throws Exception {
         .getVolumesList().get(0);
     assertNull(dataVolume.getDbParentDir());
 
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
     // RocksDB is created during upgrade
     DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
         .getVolumesList().get(0);
@@ -200,7 +200,7 @@ public void testDBCreatedInFinalize(boolean schemaV3Enabled)
     UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
     // Set layout version.
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     layoutStorage.initialize();
@@ -220,7 +220,7 @@ public void testDBCreatedInFinalize(boolean schemaV3Enabled)
     // Restart DN and finalize upgrade
     dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
 
     // RocksDB is created by upgrade action
     dataVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
@@ -250,13 +250,13 @@ public void testFinalizeTwice(boolean schemaV3Enabled) 
throws Exception {
 
     dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
 
     DbVolume dbVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
         .getVolumesList().get(0)).getDbVolume();
     assertNotNull(dbVolume);
 
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
     // DB Dir should be the same.
     assertEquals(dbVolume, ((HddsVolume) dsm.getContainer()
         .getVolumeSet().getVolumesList().get(0)).getDbVolume());
@@ -277,7 +277,7 @@ public void testAddHddsVolumeAfterFinalize(boolean 
schemaV3Enabled)
 
     dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
 
     // Add a new HddsVolume. It should have DB created after DN restart.
     UpgradeTestHelper.addHddsVolume(conf, tempFolder);
@@ -314,7 +314,7 @@ public void testAddDbVolumeAfterFinalize(boolean 
schemaV3Enabled)
     HddsVolume hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
         .getVolumesList().get(0);
     assertNull(hddsVolume.getDbParentDir());
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
     // DB is created during upgrade
     File dbDir = hddsVolume.getDbParentDir();
     assertTrue(dbDir.getAbsolutePath().startsWith(
@@ -354,7 +354,7 @@ public void testAddDbAndHddsVolumeAfterFinalize(boolean 
schemaV3Enabled)
 
     dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
 
     UpgradeTestHelper.addDbVolume(conf, tempFolder);
     File newDataVolume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
@@ -424,7 +424,7 @@ public void testWrite(boolean enable, String 
expectedVersion)
     dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
 
     final Pipeline pipeline = MockPipeline.createPipeline(
         Collections.singletonList(dsm.getDatanodeDetails()));
@@ -486,7 +486,7 @@ public void testReadsDuringFinalize(boolean schemaV3Enabled)
     ExecutorService executor = Executors.newFixedThreadPool(1);
     Future<Void> readFuture = executor.submit(() -> {
       // Layout version check should be thread safe.
-      while (!dsm.getLayoutVersionManager()
+      while (!dsm.getVersionManager()
           .isAllowed(HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
         UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
       }
@@ -495,7 +495,7 @@ public void testReadsDuringFinalize(boolean schemaV3Enabled)
       return null;
     });
 
-    dsm.finalizeUpgrade();
+    dsm.getVersionManager().finalizeUpgrade();
     // If there was a failure reading during the upgrade, the exception will
     // be thrown here.
     readFuture.get();
@@ -514,7 +514,7 @@ public void testFinalizeFailure(boolean schemaV3Enabled) 
throws Exception {
     UpgradeTestHelper.addHddsVolume(conf, tempFolder);
     // Let HddsVolume be formatted to mimic the real cluster upgrade
     // Set layout version.
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     layoutStorage.initialize();
@@ -557,7 +557,7 @@ public void testFinalizeFailure(boolean schemaV3Enabled) 
throws Exception {
 
     // Finalize will fail because of DB creation failure
     try {
-      dsm.finalizeUpgrade();
+      dsm.getVersionManager().finalizeUpgrade();
     } catch (Exception e) {
       // Currently there will be retry if finalization failed.
       // Let's assume retry is terminated by user.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeVersionManager.java
similarity index 50%
copy from 
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
copy to 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeVersionManager.java
index 9e89a7780de..93d33ca9aea 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeVersionManager.java
@@ -15,15 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.upgrade;
-
-import static org.apache.hadoop.ozone.OzoneManagerVersion.SOFTWARE_VERSION;
-import static org.apache.hadoop.ozone.OzoneManagerVersion.ZDU;
-import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.DELEGATION_TOKEN_SYMMETRIC_SIGN;
-import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT;
-import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.HBASE_SUPPORT;
-import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.INITIAL_VERSION;
-import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.QUOTA;
+package org.apache.hadoop.ozone.container.upgrade;
+
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATANODE_SCHEMA_V3;
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT;
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.INITIAL_VERSION;
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION;
+import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.WITNESSED_CONTAINER_DB_PROTO_VALUE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -39,7 +37,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -52,13 +49,17 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 import org.apache.hadoop.hdds.ComponentVersion;
+import org.apache.hadoop.hdds.HDDSVersion;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneManagerVersion;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMStorage;
-import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeAction;
+import org.apache.hadoop.hdds.upgrade.DatanodeUpgradeActionProvider;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.upgrade.AbstractComponentVersionManagerTest;
 import org.apache.hadoop.ozone.upgrade.ComponentUpgradeActionProvider;
+import org.apache.hadoop.ozone.upgrade.ComponentVersionManager;
 import org.apache.hadoop.ozone.upgrade.UpgradeException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,23 +68,23 @@
 import org.mockito.Mockito;
 
 /**
- * Tests for {@link OMVersionManager}. Shared abstract tests use an empty 
upgrade-action map; action behavior is
- * covered with map-based providers and a required classpath discovery test.
+ * Tests for {@link DatanodeVersionManager} using on-disk {@link 
DatanodeStorage} under a JUnit temp directory.
  */
-class TestOMVersionManager extends AbstractComponentVersionManagerTest {
-
-  private static final List<ComponentVersion> ALL_VERSIONS;
+class TestDatanodeVersionManager extends AbstractComponentVersionManagerTest {
 
   private OzoneConfiguration conf;
 
   @TempDir
-  private Path tempFolder;
+  private Path tempDir;
+
+  private static final List<ComponentVersion> ALL_VERSIONS;
 
   static {
-    ALL_VERSIONS = new ArrayList<>(Arrays.asList(OMLayoutFeature.values()));
-    for (OzoneManagerVersion version : OzoneManagerVersion.values()) {
+    ALL_VERSIONS = new ArrayList<>(Arrays.asList(HDDSLayoutFeature.values()));
+
+    for (HDDSVersion version : HDDSVersion.values()) {
       // Add all defined versions after and including ZDU to get the complete 
version list.
-      if (ZDU.isSupportedBy(version) && version != 
OzoneManagerVersion.FUTURE_VERSION) {
+      if (HDDSVersion.ZDU.isSupportedBy(version) && version != 
HDDSVersion.FUTURE_VERSION) {
         ALL_VERSIONS.add(version);
       }
     }
@@ -101,17 +102,24 @@ public static Stream<Arguments> preFinalizedVersionArgs() 
{
   }
 
   @Override
-  protected OMVersionManager createManager(int serializedApparentVersion) 
throws IOException {
-    // By default create a version manager which does not have any upgrade 
actions to run. Production upgrade actions
-    // may not be able to run in a test environment during finalization.
+  protected ComponentVersionManager createManager(int 
serializedApparentVersion) throws IOException {
     return createManager(serializedApparentVersion, HashMap::new);
   }
 
-  private OMVersionManager createManager(int serializedApparentVersion,
-      ComponentUpgradeActionProvider<OmUpgradeAction> actions) throws 
IOException {
-    OMStorage storage = newOmStorage(serializedApparentVersion);
-    OzoneManager mockOM = Mockito.mock(OzoneManager.class);
-    return new OMVersionManager(storage, mockOM, actions);
+  private DatanodeVersionManager createManager(int serializedApparentVersion,
+      ComponentUpgradeActionProvider<DatanodeUpgradeAction> actions) throws 
IOException {
+    DatanodeStorage storage = newDatanodeStorage(serializedApparentVersion);
+    return new DatanodeVersionManager(storage, null, actions);
+  }
+
+  private DatanodeStorage newDatanodeStorage(int apparentVersion) throws 
IOException {
+    Path storageRoot = Files.createTempDirectory(tempDir, 
"dn-version-manager-");
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageRoot.toString());
+    DatanodeStorage storage = new DatanodeStorage(conf, 
UUID.randomUUID().toString(), apparentVersion);
+    storage.setApparentVersion(apparentVersion);
+    assertTrue(storage.getCurrentDir().mkdirs());
+    storage.persistCurrentState();
+    return storage;
   }
 
   @Override
@@ -121,72 +129,79 @@ protected List<ComponentVersion> allVersionsInOrder() {
 
   @Override
   protected ComponentVersion expectedSoftwareVersion() {
-    return OzoneManagerVersion.SOFTWARE_VERSION;
+    return HDDSVersion.SOFTWARE_VERSION;
   }
 
+  @Override
   @Test
   public void testClasspathScanDiscoversUpgradeActions() throws Exception {
-    // Regardless of whether OM is finalized, the same set of upgrade actions 
should be loaded.
-    try (OMVersionManager versionManager = 
createManager(INITIAL_VERSION.serialize(), new OMUpgradeActionProvider())) {
+    // Regardless of whether Datanode is finalized, the same set of upgrade 
actions should be loaded.
+    try (DatanodeVersionManager versionManager = 
createManager(INITIAL_VERSION.serialize(),
+        new DatanodeUpgradeActionProvider())) {
       assertTrue(versionManager.needsFinalization());
-      OmUpgradeAction quotaAction = 
versionManager.getUpgradeActionsForTesting().get(QUOTA);
-      assertInstanceOf(QuotaRepairUpgradeAction.class, quotaAction);
+      DatanodeUpgradeAction quotaAction = 
versionManager.getUpgradeActionsForTesting().get(DATANODE_SCHEMA_V3);
+      assertInstanceOf(DatanodeSchemaV3FinalizeAction.class, quotaAction);
     }
 
-    try (OMVersionManager versionManager = 
createManager(SOFTWARE_VERSION.serialize(), new OMUpgradeActionProvider())) {
+    try (DatanodeVersionManager versionManager =
+        createManager(HDDSVersion.SOFTWARE_VERSION.serialize(), new 
DatanodeUpgradeActionProvider())) {
       assertFalse(versionManager.needsFinalization());
-      OmUpgradeAction quotaAction = 
versionManager.getUpgradeActionsForTesting().get(QUOTA);
-      assertInstanceOf(QuotaRepairUpgradeAction.class, quotaAction);
+      DatanodeUpgradeAction quotaAction = 
versionManager.getUpgradeActionsForTesting().get(DATANODE_SCHEMA_V3);
+      assertInstanceOf(DatanodeSchemaV3FinalizeAction.class, quotaAction);
     }
   }
 
+  @Override
   @Test
   public void testFinalizeRunsSuppliedUpgradeAction() throws Exception {
-    OmUpgradeAction mockECAction = mock(OmUpgradeAction.class);
-    OmUpgradeAction mockZDUAction = mock(OmUpgradeAction.class);
+    DatanodeUpgradeAction mockECAction = mock(DatanodeUpgradeAction.class);
+    DatanodeUpgradeAction mockZDUAction = mock(DatanodeUpgradeAction.class);
 
-    ComponentUpgradeActionProvider<OmUpgradeAction> provider = () -> {
-      Map<ComponentVersion, OmUpgradeAction> m = new HashMap<>();
+    ComponentUpgradeActionProvider<DatanodeUpgradeAction> provider = () -> {
+      Map<ComponentVersion, DatanodeUpgradeAction> m = new HashMap<>();
       m.put(ERASURE_CODED_STORAGE_SUPPORT, mockECAction);
-      m.put(ZDU, mockZDUAction);
+      m.put(HDDSVersion.ZDU, mockZDUAction);
       return m;
     };
 
-    try (OMVersionManager versionManager = createManager(QUOTA.serialize(), 
provider)) {
+    try (DatanodeVersionManager versionManager = 
createManager(DATANODE_SCHEMA_V3.serialize(), provider)) {
       versionManager.finalizeUpgrade();
-      assertEquals(OzoneManagerVersion.SOFTWARE_VERSION, 
versionManager.getApparentVersion());
+      assertEquals(HDDSVersion.SOFTWARE_VERSION, 
versionManager.getApparentVersion());
 
-      // QUOTA was added after EC, so the EC upgrade action should not run 
when we finalize from this version.
+      // DATANODE_SCHEMA_V3 was added after EC, so the EC upgrade action 
should not run when we finalize from this
+      // version.
       verify(mockECAction, never()).execute(any());
       verify(mockZDUAction, atLeastOnce()).execute(any());
-      assertOmApparentVersionOnDisk(conf, 
OzoneManagerVersion.SOFTWARE_VERSION.serialize());
+      assertDatanodeApparentVersionOnDisk(conf, 
HDDSVersion.SOFTWARE_VERSION.serialize());
     }
   }
 
+  @Override
   @Test
   public void testUpgradeActionFailureAbortsFinalize() throws Exception {
-    ComponentUpgradeActionProvider<OmUpgradeAction> provider = () -> {
-      Map<ComponentVersion, OmUpgradeAction> m = new HashMap<>();
-      m.put(DELEGATION_TOKEN_SYMMETRIC_SIGN, o -> {
+    ComponentUpgradeActionProvider<DatanodeUpgradeAction> provider = () -> {
+      Map<ComponentVersion, DatanodeUpgradeAction> m = new HashMap<>();
+      m.put(STORAGE_SPACE_DISTRIBUTION, o -> {
         throw new IOException("expected test failure");
       });
       return m;
     };
 
-    try (OMVersionManager versionManager = createManager(QUOTA.serialize(), 
provider)) {
-      UpgradeException thrown =
-          assertThrows(UpgradeException.class, 
versionManager::finalizeUpgrade);
+    try (DatanodeVersionManager versionManager =
+             createManager(WITNESSED_CONTAINER_DB_PROTO_VALUE.serialize(), 
provider)) {
+      UpgradeException thrown = assertThrows(UpgradeException.class, 
versionManager::finalizeUpgrade);
       
assertEquals(UpgradeException.ResultCodes.FINALIZE_UPGRADE_ACTION_FAILED, 
thrown.getResult());
-      // HBase is the version before symmetric encrypted delegation tokens, 
which has failed.
-      assertEquals(HBASE_SUPPORT, versionManager.getApparentVersion());
-      assertOmApparentVersionOnDisk(conf, HBASE_SUPPORT.serialize());
+      // WITNESSED_CONTAINER_DB_PROTO_VALUE is the version before 
STORAGE_SPACE_DISTRIBUTION, which has failed.
+      assertEquals(WITNESSED_CONTAINER_DB_PROTO_VALUE, 
versionManager.getApparentVersion());
+      assertDatanodeApparentVersionOnDisk(conf, 
WITNESSED_CONTAINER_DB_PROTO_VALUE.serialize());
     }
   }
 
+  @Override
   @Test
   public void testPersistFailureRollsBack() throws Exception {
     // Create a mock storage instance that throws when persisting version 
updates.
-    OMStorage storage = mock(OMStorage.class);
+    DatanodeStorage storage = mock(DatanodeStorage.class);
     AtomicInteger persistedApparentVersion = new 
AtomicInteger(INITIAL_VERSION.serialize());
     when(storage.getApparentVersion()).thenAnswer(invocation -> 
persistedApparentVersion.get());
     doAnswer(invocation -> {
@@ -195,8 +210,8 @@ public void testPersistFailureRollsBack() throws Exception {
     }).when(storage).setApparentVersion(anyInt());
     doThrow(new IOException("persist 
failed")).when(storage).persistCurrentState();
 
-    OzoneManager mockOM = Mockito.mock(OzoneManager.class);
-    try (OMVersionManager versionManager = new OMVersionManager(storage, 
mockOM, HashMap::new)) {
+    DatanodeStateMachine mockDN = Mockito.mock(DatanodeStateMachine.class);
+    try (DatanodeVersionManager versionManager = new 
DatanodeVersionManager(storage, mockDN, HashMap::new)) {
       assertEquals(INITIAL_VERSION, versionManager.getApparentVersion());
       UpgradeException thrown = assertThrows(UpgradeException.class, 
versionManager::finalizeUpgrade);
       
assertEquals(UpgradeException.ResultCodes.APPARENT_VERSION_UPDATE_FAILED, 
thrown.getResult());
@@ -205,22 +220,9 @@ public void testPersistFailureRollsBack() throws Exception 
{
     }
   }
 
-  private OMStorage newOmStorage(int apparentVersion)
+  private static void assertDatanodeApparentVersionOnDisk(OzoneConfiguration 
conf, int expected)
       throws IOException {
-    // Reinitialize the configuration to point to a new unique storage 
location.
-    Path dbDir = Files.createDirectory(new File(tempFolder.toFile(), 
UUID.randomUUID().toString()).toPath());
-    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS, dbDir.toString());
-    OMStorage storage = new OMStorage(conf);
-    storage.setClusterId("test-cluster");
-    storage.setApparentVersion(apparentVersion);
-    storage.setOmId(UUID.randomUUID().toString());
-    storage.initialize();
-    storage.persistCurrentState();
-    return storage;
-  }
-
-  private static void assertOmApparentVersionOnDisk(OzoneConfiguration conf, 
int expected) throws IOException {
-    OMStorage reloaded = new OMStorage(conf);
+    DatanodeStorage reloaded = new DatanodeStorage(conf, 
UUID.randomUUID().toString());
     assertEquals(expected, reloaded.getApparentVersion());
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
index 2b6112bd0d9..e4a0da2114f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
@@ -42,7 +42,7 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -71,13 +71,13 @@ private UpgradeTestHelper() {
   public static DatanodeStateMachine startPreFinalizedDatanode(
       OzoneConfiguration conf, Path tempFolder,
       DatanodeStateMachine dsm, InetSocketAddress address,
-      int metadataLayoutVersion)
+      int apparentVersion)
       throws Exception {
     // Set layout version.
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString(),
-        metadataLayoutVersion);
+        apparentVersion);
     layoutStorage.initialize();
     if (dsm != null) {
       dsm.close();
@@ -86,10 +86,10 @@ public static DatanodeStateMachine 
startPreFinalizedDatanode(
     // Build and start the datanode.
     DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
     dsm = new DatanodeStateMachine(dd, conf);
-    int actualMlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    int actualApparentVersion = 
dsm.getVersionManager().getApparentVersion().serialize();
     assertEquals(
-        metadataLayoutVersion,
-        actualMlv);
+        apparentVersion,
+        actualApparentVersion);
 
 
     callVersionEndpointTask(conf, dsm.getContainer(), address);
@@ -98,7 +98,7 @@ public static DatanodeStateMachine startPreFinalizedDatanode(
 
   public static DatanodeStateMachine restartDatanode(
       OzoneConfiguration conf, DatanodeStateMachine dsm, boolean 
shouldSetDbParentDir,
-      Path tempFolder, InetSocketAddress address, int expectedMlv, boolean 
exactMatch)
+      Path tempFolder, InetSocketAddress address, int expectedApparentVersion, 
boolean exactMatch)
       throws Exception {
     // Stop existing datanode.
     DatanodeDetails dd = dsm.getDatanodeDetails();
@@ -110,11 +110,11 @@ public static DatanodeStateMachine restartDatanode(
       
StorageVolumeUtil.getHddsVolumesList(dsm.getContainer().getVolumeSet().getVolumesList())
           .forEach(hddsVolume -> 
hddsVolume.setDbParentDir(tempFolder.toFile()));
     }
-    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    int apparentVersion = 
dsm.getVersionManager().getApparentVersion().serialize();
     if (exactMatch) {
-      assertEquals(expectedMlv, mlv);
+      assertEquals(expectedApparentVersion, apparentVersion);
     } else {
-      assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
+      assertThat(expectedApparentVersion).isLessThanOrEqualTo(apparentVersion);
     }
 
     callVersionEndpointTask(conf, dsm.getContainer(), address);
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSVersionManager.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSVersionManager.java
index 576655a8ff6..9d01346bcd3 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSVersionManager.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSVersionManager.java
@@ -27,8 +27,8 @@
 /**
  * Component version manager for HDDS (Datanodes and SCM).
  */
-public class HDDSVersionManager extends ComponentVersionManager {
-  public HDDSVersionManager(Storage storage) throws IOException {
+public abstract class HDDSVersionManager extends ComponentVersionManager {
+  protected HDDSVersionManager(Storage storage) throws IOException {
     super(storage, computeApparentVersion(storage.getApparentVersion()), 
HDDSVersion.SOFTWARE_VERSION);
   }
 
@@ -58,7 +58,5 @@ private static ComponentVersion computeApparentVersion(int 
serializedApparentVer
   }
 
   @Override
-  protected void runUpgradeAction(ComponentVersion componentVersion) throws 
UpgradeException {
-    // TODO HDDS-15129: Register upgrade actions based on annotations
-  }
+  protected abstract void runUpgradeAction(ComponentVersion version) throws 
UpgradeException;
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSVersionManager.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSVersionManager.java
deleted file mode 100644
index 77a3720a25b..00000000000
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSVersionManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.upgrade;
-
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
-import org.apache.hadoop.hdds.ComponentVersion;
-import org.apache.hadoop.hdds.HDDSVersion;
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.upgrade.AbstractComponentVersionManagerTest;
-import org.apache.hadoop.ozone.upgrade.ComponentVersionManager;
-import org.apache.ozone.test.tag.Unhealthy;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link HDDSVersionManager} using on-disk {@link Storage} under a 
JUnit temp directory.
- */
-class TestHDDSVersionManager extends AbstractComponentVersionManagerTest {
-
-  private static final List<ComponentVersion> ALL_VERSIONS;
-
-  static {
-    ALL_VERSIONS = new ArrayList<>(Arrays.asList(HDDSLayoutFeature.values()));
-
-    for (HDDSVersion version : HDDSVersion.values()) {
-      // Add all defined versions after and including ZDU to get the complete 
version list.
-      if (HDDSVersion.ZDU.isSupportedBy(version) && version != 
HDDSVersion.FUTURE_VERSION) {
-        ALL_VERSIONS.add(version);
-      }
-    }
-  }
-
-  public static Stream<Arguments> preFinalizedVersionArgs() {
-    return ALL_VERSIONS.stream()
-        .limit(ALL_VERSIONS.size() - 1)
-        .map(Arguments::of);
-  }
-
-  @ParameterizedTest
-  @MethodSource("preFinalizedVersionArgs")
-  @Override
-  @Unhealthy
-  public void testFinalizationFromEarlierVersions(ComponentVersion 
apparentVersion) {
-    // TODO Once HDDSVersionManager implementation is finished, this override 
can be removed to enable the test in the
-    //  parent class.
-  }
-
-  @Override
-  protected ComponentVersionManager createManager(int 
serializedApparentVersion) throws IOException {
-    Storage mockStorage = Mockito.mock(Storage.class);
-    
when(mockStorage.getApparentVersion()).thenReturn(serializedApparentVersion);
-    return new HDDSVersionManager(mockStorage);
-  }
-
-  @Override
-  protected List<ComponentVersion> allVersionsInOrder() {
-    return ALL_VERSIONS;
-  }
-
-  @Override
-  protected ComponentVersion expectedSoftwareVersion() {
-    return HDDSVersion.SOFTWARE_VERSION;
-  }
-}
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/ozone/upgrade/AbstractComponentVersionManagerTest.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/ozone/upgrade/AbstractComponentVersionManagerTest.java
index b5a38623faf..9eb0eb7e3de 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/ozone/upgrade/AbstractComponentVersionManagerTest.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/ozone/upgrade/AbstractComponentVersionManagerTest.java
@@ -56,6 +56,18 @@ public abstract class AbstractComponentVersionManagerTest {
 
   protected abstract ComponentVersion expectedSoftwareVersion();
 
+  @Test
+  public abstract void testClasspathScanDiscoversUpgradeActions() throws 
Exception;
+
+  @Test
+  public abstract void testFinalizeRunsSuppliedUpgradeAction() throws 
Exception;
+
+  @Test
+  public abstract void testUpgradeActionFailureAbortsFinalize() throws 
Exception;
+
+  @Test
+  public abstract void testPersistFailureRollsBack() throws Exception;
+
   @AfterEach
   public void cleanupMetricsSource() {
     
DefaultMetricsSystem.instance().unregisterSource(ComponentVersionManagerMetrics.METRICS_SOURCE_NAME);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 4f4ef273347..22fd52835d9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -50,6 +50,7 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.management.ObjectName;
+import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -91,7 +92,7 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -155,6 +156,17 @@ public class SCMNodeManager implements NodeManager {
   private static final String DNUUID = "UUID";
   private static final String VERSION = "VERSION";
 
+  /**
+   * TODO HDDS-15129 Remove when SCM uses the new versioning framework
+   * Datanodes on {@link HDDSVersion} report {@link HDDSVersion#ZDU} as 
software version ({@code 100}), while
+   * SCM still on {@link org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature} 
reports the legacy maximum ({@code 10}
+   * today).
+   * Without this bridge, registration fails and heartbeats log {@code dnSlv > 
scmSlv} as an invalid node.
+   */
+  private static boolean shouldFenceDatanode(int dnSoftwareVersion, int 
scmSoftwareVersion) {
+    return dnSoftwareVersion > scmSoftwareVersion && dnSoftwareVersion != 
HDDSVersion.ZDU.serialize();
+  }
+
   /**
    * Constructs SCM machine Manager.
    */
@@ -396,8 +408,9 @@ public RegisteredCommand register(
       DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
       PipelineReportsProto pipelineReportsProto,
       LayoutVersionProto layoutInfo) {
-    if (layoutInfo.getSoftwareLayoutVersion() !=
-        scmLayoutVersionManager.getSoftwareLayoutVersion()) {
+    int dnSlvRegister = layoutInfo.getSoftwareLayoutVersion();
+    int scmSlvRegister = scmLayoutVersionManager.getSoftwareLayoutVersion();
+    if (shouldFenceDatanode(dnSlvRegister, scmSlvRegister)) {
       return RegisteredCommand.newBuilder()
           .setErrorCode(ErrorCode.errorNodeNotPermitted)
           .setDatanode(datanodeDetails)
@@ -758,7 +771,9 @@ protected void 
sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
 
     // A datanode with a larger software layout version is from a future
     // version of ozone. It should not have been added to the cluster.
-    if (dnSlv > scmSlv) {
+    // TODO HDDS-15129 REMOVE WHEN SCM USES new versioning framework.
+    //  For now, do not treat datanodes with ZDU future software version as 
invalid.
+    if (shouldFenceDatanode(dnSlv, scmSlv)) {
       LOG.error("Invalid data node in the cluster : {}. " +
               "DataNode SoftwareLayoutVersion = {}, SCM " +
               "SoftwareLayoutVersion = {}",
@@ -781,8 +796,8 @@ protected void 
sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
     LOG.warn("Data node {} has a MetadataLayoutVersion = {}, SCM 
MetadataLayoutVersion = {}. Sending finalize",
         datanodeDetails.getHostName(), dnMlv, scmMlv);
 
-    FinalizeNewLayoutVersionCommand finalizeCmd =
-        new FinalizeNewLayoutVersionCommand(true,
+    FinalizeVersionCommand finalizeCmd =
+        new FinalizeVersionCommand(true,
             LayoutVersionProto.newBuilder()
                 .setSoftwareLayoutVersion(dnSlv)
                 .setMetadataLayoutVersion(dnSlv).build());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index dd403f46a18..f1c4c78168f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -94,7 +94,7 @@
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.FinalizeVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -423,7 +423,7 @@ public static SCMCommandProto 
getCommandResponse(SCMCommand<?> cmd,
       return builder
             .setCommandType(finalizeNewLayoutVersionCommand)
             .setFinalizeNewLayoutVersionCommandProto(
-                ((FinalizeNewLayoutVersionCommand)cmd).getProto())
+                ((FinalizeVersionCommand)cmd).getProto())
             .build();
     case refreshVolumeUsageInfo:
       return builder
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6acb59eecf0..45faab72f01 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -187,7 +187,6 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
-import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.lease.LeaseManagerNotRunningException;
 import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor;
@@ -705,7 +704,6 @@ private void initializeSystemManagers(OzoneConfiguration 
conf,
 
     scmLayoutVersionManager = new HDDSLayoutVersionManager(
         scmStorageConfig.getApparentVersion(), new ScmUpgradeActionProvider(), 
null);
-    VersionedDatanodeFeatures.initialize(scmLayoutVersionManager);
 
     UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext>
         finalizationExecutor;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 04b6751b8c4..543b97530cd 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -137,19 +137,19 @@ public class TestSCMNodeManager {
   private StorageContainerManager scm;
   private SCMContext scmContext;
 
-  private static final int MAX_LV = 
HDDSLayoutVersionManager.maxLayoutVersion();
-  private static final LayoutVersionProto LARGER_SLV_LAYOUT_PROTO =
-      toLayoutVersionProto(MAX_LV, MAX_LV + 1);
-  private static final LayoutVersionProto SMALLER_MLV_LAYOUT_PROTO =
-      toLayoutVersionProto(MAX_LV - 1, MAX_LV);
+  private static final int MAX_SOFTWARE_VERSION = 
HDDSLayoutVersionManager.maxLayoutVersion();
+  private static final LayoutVersionProto LARGER_SOFTWARE_PROTO =
+      toLayoutVersionProto(MAX_SOFTWARE_VERSION, MAX_SOFTWARE_VERSION + 1);
+  private static final LayoutVersionProto SMALLER_APPARENT_VERSION_PROTO =
+      toLayoutVersionProto(MAX_SOFTWARE_VERSION - 1, MAX_SOFTWARE_VERSION);
   // In a real cluster, startup is disallowed if MLV is larger than SLV, so
   // increase both numbers to test smaller SLV or larger MLV.
-  private static final LayoutVersionProto SMALLER_MLV_SLV_LAYOUT_PROTO =
-      toLayoutVersionProto(MAX_LV - 1, MAX_LV - 1);
-  private static final LayoutVersionProto LARGER_MLV_SLV_LAYOUT_PROTO =
-      toLayoutVersionProto(MAX_LV + 1, MAX_LV + 1);
-  private static final LayoutVersionProto CORRECT_LAYOUT_PROTO =
-      toLayoutVersionProto(MAX_LV, MAX_LV);
+  private static final LayoutVersionProto SMALLER_ALL_VERSIONS_PROTO =
+      toLayoutVersionProto(MAX_SOFTWARE_VERSION - 1, MAX_SOFTWARE_VERSION - 1);
+  private static final LayoutVersionProto LARGER_ALL_VERSIONS_PROTO =
+      toLayoutVersionProto(MAX_SOFTWARE_VERSION + 1, MAX_SOFTWARE_VERSION + 1);
+  private static final LayoutVersionProto MATCHING_VERSION_PROTO =
+      toLayoutVersionProto(MAX_SOFTWARE_VERSION, MAX_SOFTWARE_VERSION);
 
   @BeforeEach
   public void setup() {
@@ -310,8 +310,10 @@ private DatanodeDetails 
registerWithCapacity(SCMNodeManager nodeManager,
    * @throws InterruptedException
    * @throws TimeoutException
    */
-  @Test
-  public void testScmLayoutOnRegister()
+  // TODO HDDS-15129 This test will need to be updated to check registration 
conditions depending on SCM's
+  //  finalization state.
+  // @Test
+  public void testScmVersionOnRegister()
       throws Exception {
 
     OzoneConfiguration conf = getConf();
@@ -322,19 +324,19 @@ public void testScmLayoutOnRegister()
       assertTrue(scm.getScmContext().isLeader());
       // Nodes with mismatched SLV cannot join the cluster.
       registerWithCapacity(nodeManager,
-          LARGER_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+          LARGER_SOFTWARE_PROTO, errorNodeNotPermitted);
       registerWithCapacity(nodeManager,
-          SMALLER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+          SMALLER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
       registerWithCapacity(nodeManager,
-          LARGER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+          LARGER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
       // Nodes with mismatched MLV can join
       DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager,
-          SMALLER_MLV_LAYOUT_PROTO, success);
+          SMALLER_APPARENT_VERSION_PROTO, success);
       DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager,
-          SMALLER_MLV_LAYOUT_PROTO, success);
+          SMALLER_APPARENT_VERSION_PROTO, success);
       // This node has correct MLV and SLV
       DatanodeDetails goodNode = registerWithCapacity(nodeManager,
-          CORRECT_LAYOUT_PROTO, success);
+          MATCHING_VERSION_PROTO, success);
 
       assertEquals(3, nodeManager.getAllNodes().size());
 
@@ -738,7 +740,7 @@ public void testProcessLayoutVersion() throws IOException {
   }
 
   @Test
-  public void testDatanodeFinalizedCounterTracksLayoutVersionReports()
+  public void testDatanodeFinalizedCounterTracksVersionReports()
       throws IOException, AuthenticationException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       DatanodeDetails node =
@@ -776,13 +778,13 @@ public void 
testDatanodeFinalizedCounterTracksRegistrationAndRemoveNode()
       throws IOException, AuthenticationException, NodeNotFoundException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       DatanodeDetails finalizedNode =
-          registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success);
+          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
       assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
               .getNumFinalizedDatanodes(),
           "Finalized registration should increment finalized count");
 
       DatanodeDetails nonFinalizedNode =
-          registerWithCapacity(nodeManager, SMALLER_MLV_LAYOUT_PROTO, success);
+          registerWithCapacity(nodeManager, SMALLER_APPARENT_VERSION_PROTO, 
success);
       assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
               .getNumFinalizedDatanodes(),
           "Non-finalized registration should not increment finalized count");
@@ -822,10 +824,10 @@ public void 
testDatanodeFinalizedCounterExcludesNonHealthyNodes(NodeStatus expec
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       // transitionNode stops heartbeating and will become STALE or DEAD
       DatanodeDetails transitionNode =
-          registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success);
+          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
       // heartbeatingNode keeps heartbeating as a healthy baseline
       DatanodeDetails heartbeatingNode =
-          registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success);
+          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
 
       nodeManager.processHeartbeat(transitionNode);
       nodeManager.processHeartbeat(heartbeatingNode);
@@ -866,7 +868,7 @@ public void 
testDatanodeFinalizedCounterIncludesAllHealthyOpStates(
       throws IOException, AuthenticationException, NodeNotFoundException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       DatanodeDetails node =
-          registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success);
+          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
       nodeManager.setNodeOperationalState(node, opState);
 
       // All HEALTHY nodes should be counted regardless of operational state
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 229d12f5be0..c9a66f7c56c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.container.common;
 
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
 import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createEndpoint;
 import static 
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -39,6 +38,7 @@
 import java.util.Map;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -60,7 +60,6 @@
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ipc_.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -82,6 +81,7 @@
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
@@ -118,7 +118,7 @@ static void setUp() throws Exception {
     ozoneConf = SCMTestUtils.getConf(testDir);
     scmServerImpl = new ScmTestMock();
     dnDetails = randomDatanodeDetails();
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(ozoneConf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(ozoneConf,
         UUID.randomUUID().toString(),
         HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion());
     layoutStorage.initialize();
@@ -287,8 +287,8 @@ public void testDnLayoutVersionFile() throws Exception {
 
       // After the version call, the datanode layout file should
       // have its clusterID field set to the clusterID of the scm
-      DatanodeLayoutStorage layout
-          = new DatanodeLayoutStorage(ozoneConf,
+      DatanodeStorage layout
+          = new DatanodeStorage(ozoneConf,
           "na_expect_storage_initialized");
       assertEquals(scmServerImpl.getClusterId(), layout.getClusterID());
 
@@ -310,8 +310,8 @@ public void testDnLayoutVersionFile() throws Exception {
       // not update its clusterID field.
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       versionTask.call();
-      DatanodeLayoutStorage layout1
-          = new DatanodeLayoutStorage(ozoneConf,
+      DatanodeStorage layout1
+          = new DatanodeStorage(ozoneConf,
           "na_expect_storage_initialized");
 
       assertEquals("different_cluster_id", layout1.getClusterID());
@@ -321,7 +321,7 @@ public void testDnLayoutVersionFile() throws Exception {
       FileUtils.forceDelete(layout1.getVersionFile());
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       versionTask.call();
-      assertEquals(StorageState.NOT_INITIALIZED, new 
DatanodeLayoutStorage(ozoneConf, "any").getState());
+      assertEquals(StorageState.NOT_INITIALIZED, new 
DatanodeStorage(ozoneConf, "any").getState());
 
       FileUtils.forceDelete(storageDir);
     }
@@ -432,15 +432,18 @@ private RegisterEndpointTask 
getRegisterEndpointTask(boolean clearDatanodeDetail
     when(ozoneContainer.getController()).thenReturn(controller);
     when(ozoneContainer.getPipelineReport()).thenReturn(
         HddsTestUtils.getRandomPipelineReports());
-    HDDSLayoutVersionManager versionManager =
-        mock(HDDSLayoutVersionManager.class);
-    when(versionManager.getMetadataLayoutVersion())
-        .thenReturn(maxLayoutVersion());
-    when(versionManager.getSoftwareLayoutVersion())
-        .thenReturn(maxLayoutVersion());
+    DatanodeVersionManager versionManager =
+        mock(DatanodeVersionManager.class);
+    when(versionManager.getApparentVersion())
+        .thenReturn(HDDSVersion.SOFTWARE_VERSION);
+    when(versionManager.getSoftwareVersion())
+        .thenReturn(HDDSVersion.SOFTWARE_VERSION);
+    DatanodeStateMachine dn = mock(DatanodeStateMachine.class);
+    when(dn.getVersionManager()).thenReturn(versionManager);
+    StateContext context = mock(StateContext.class);
+    when(context.getParent()).thenReturn(dn);
     RegisterEndpointTask endpointTask =
-        new RegisterEndpointTask(rpcEndPoint, ozoneContainer,
-            mock(StateContext.class), versionManager);
+        new RegisterEndpointTask(rpcEndPoint, ozoneContainer, context);
     if (!clearDatanodeDetails) {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       endpointTask.setDatanodeDetails(datanodeDetails);
@@ -624,8 +627,7 @@ private StateContext heartbeatTaskHelper(
               stateMachine, "");
 
       HeartbeatEndpointTask endpointTask =
-          new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext,
-              stateMachine.getLayoutVersionManager());
+          new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext);
       endpointTask.setDatanodeDetailsProto(datanodeDetailsProto);
       endpointTask.call();
       assertNotNull(endpointTask.getDatanodeDetailsProto());
diff --git 
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/UpgradeUtil.java
 
b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/UpgradeUtil.java
deleted file mode 100644
index 1e248275fd5..00000000000
--- 
a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/UpgradeUtil.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.admin.upgrade;
-
-import java.io.IOException;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-
-/**
- * Static utility methods for upgrade commands.
- */
-public final class UpgradeUtil {
-
-  private UpgradeUtil() { }
-
-  /** Utility method to test if the upgrade has completed (finalized) or not.
-   *
-   * @param scmClient
-   * @return True if the cluster has completed the upgrade and is finalized.
-   * @throws IOException
-   */
-  public static boolean isFinalizationComplete(
-      StorageContainerLocationProtocol scmClient) throws IOException {
-    return scmClient.queryUpgradeStatus().getShouldFinalize();
-  }
-
-}
diff --git 
a/hadoop-ozone/cli-repair/src/main/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/UpgradeUtils.java
 
b/hadoop-ozone/cli-repair/src/main/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/UpgradeUtils.java
index 9ec6852b3e4..164f91b6a32 100644
--- 
a/hadoop-ozone/cli-repair/src/main/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/UpgradeUtils.java
+++ 
b/hadoop-ozone/cli-repair/src/main/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/UpgradeUtils.java
@@ -35,7 +35,7 @@
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -95,8 +95,8 @@ public static boolean createFile(File file) throws 
IOException {
 
   public static Pair<HDDSLayoutFeature, HDDSLayoutFeature> getLayoutFeature(
       DatanodeDetails dnDetail, OzoneConfiguration conf) throws IOException {
-    DatanodeLayoutStorage layoutStorage =
-        new DatanodeLayoutStorage(conf, dnDetail.getUuidString());
+    DatanodeStorage layoutStorage =
+        new DatanodeStorage(conf, dnDetail.getUuidString());
     HDDSLayoutVersionManager layoutVersionManager =
         new HDDSLayoutVersionManager(layoutStorage.getApparentVersion(), null, 
null);
 
diff --git 
a/hadoop-ozone/cli-repair/src/test/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/TestUpgradeContainerSchema.java
 
b/hadoop-ozone/cli-repair/src/test/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/TestUpgradeContainerSchema.java
index f8983e23a9e..351b2d0477b 100644
--- 
a/hadoop-ozone/cli-repair/src/test/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/TestUpgradeContainerSchema.java
+++ 
b/hadoop-ozone/cli-repair/src/test/java/org/apache/hadoop/ozone/repair/datanode/schemaupgrade/TestUpgradeContainerSchema.java
@@ -67,7 +67,7 @@
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -137,7 +137,7 @@ void setup(@TempDir Path testRoot) throws Exception {
   }
 
   private void initDatanode(HDDSLayoutFeature layoutFeature) throws 
IOException {
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         datanodeDetails.getUuidString(),
         layoutFeature.layoutVersion());
     layoutStorage.initialize();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestDNDataDistributionFinalization.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestDNDataDistributionFinalization.java
index 268f5a2dfc7..6ab15c96aee 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestDNDataDistributionFinalization.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestDNDataDistributionFinalization.java
@@ -213,7 +213,7 @@ private void validatePreDataDistributionFeatureState() {
     assertTrue(!isDataDistributionFinalized ||
             // In test environment, version manager might be null
             cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-                .getLayoutVersionManager() == null,
+                .getVersionManager() == null,
         "STORAGE_SPACE_DISTRIBUTION should not be finalized in pre-upgrade 
state");
 
     // Verify containers exist and have pending deletion metadata
@@ -227,7 +227,7 @@ private void validatePostDataDistributionFeatureState() {
     assertTrue(isDataDistributionFinalized ||
             // In test environment, version manager might be null
             cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-                .getLayoutVersionManager() == null,
+                .getVersionManager() == null,
         "STORAGE_SPACE_DISTRIBUTION should be finalized in post-upgrade 
state");
 
     // Verify containers can handle post-finalization pending deletion logic
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
index 9c2e8c7514a..bc9a71b72ac 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
@@ -17,25 +17,19 @@
 
 package org.apache.hadoop.hdds.upgrade;
 
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.ALREADY_FINALIZED;
-import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.FINALIZATION_DONE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
-import org.apache.hadoop.ozone.admin.upgrade.UpgradeUtil;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.slf4j.Logger;
@@ -52,9 +46,9 @@ private TestHddsUpgradeUtils() { }
 
   public static void 
waitForFinalizationFromClient(StorageContainerLocationProtocol scmClient) 
throws Exception {
     LambdaTestUtils.await(60_000, 1_000, () -> {
-      boolean isComplete = UpgradeUtil.isFinalizationComplete(scmClient);
-      LOG.info("Waiting for upgrade finalization to complete from 
client.Current status is {}.", isComplete);
-      return isComplete;
+      HddsProtos.UpgradeStatus status = scmClient.queryUpgradeStatus();
+      LOG.info("Waiting for upgrade finalization to complete from client. 
Current status is:\n{}", status);
+      return status.getShouldFinalize();
     });
   }
 
@@ -96,26 +90,6 @@ public static void 
testPostUpgradeConditionsSCM(StorageContainerManager scm,
     assertThat(countContainers).isGreaterThanOrEqualTo(numContainers);
   }
 
-  /*
-   * Helper function to test Pre-Upgrade conditions on all the DataNodes.
-   */
-  public static void testPreUpgradeConditionsDataNodes(
-      List<HddsDatanodeService> datanodes) {
-    for (HddsDatanodeService dataNode : datanodes) {
-      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      HDDSLayoutVersionManager dnVersionManager =
-          dsm.getLayoutVersionManager();
-      assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
-    }
-
-    int countContainers = 0;
-    for (HddsDatanodeService dataNode : datanodes) {
-      DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      countContainers += dsm.getContainer().getContainerSet().containerCount();
-    }
-    assertThat(countContainers).isGreaterThanOrEqualTo(1);
-  }
-
   /*
    * Helper function to test Post-Upgrade conditions on all the DataNodes.
    */
@@ -124,14 +98,8 @@ public static void testPostUpgradeConditionsDataNodes(
     try {
       GenericTestUtils.waitFor(() -> {
         for (HddsDatanodeService dataNode : datanodes) {
-          DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-          try {
-            if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
-                (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
-              return false;
-            }
-          } catch (IOException e) {
-            LOG.error("Failed to query datanode upgrade status.", e);
+          DatanodeVersionManager versionManager = 
dataNode.getDatanodeStateMachine().getVersionManager();
+          if (versionManager.needsFinalization()) {
             return false;
           }
         }
@@ -144,42 +112,14 @@ public static void testPostUpgradeConditionsDataNodes(
     int countContainers = 0;
     for (HddsDatanodeService dataNode : datanodes) {
       DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
-      HDDSLayoutVersionManager dnVersionManager =
-          dsm.getLayoutVersionManager();
-      assertEquals(dnVersionManager.getSoftwareLayoutVersion(),
-          dnVersionManager.getMetadataLayoutVersion());
-      
assertThat(dnVersionManager.getMetadataLayoutVersion()).isGreaterThanOrEqualTo(1);
+      DatanodeVersionManager dnVersionManager =
+          dsm.getVersionManager();
+      assertEquals(dnVersionManager.getSoftwareVersion(),
+          dnVersionManager.getApparentVersion());
+      assertThat(dnVersionManager.getApparentVersion().serialize())
+          .isGreaterThanOrEqualTo(1);
       countContainers += dsm.getContainer().getContainerSet().containerCount();
     }
     assertThat(countContainers).isGreaterThanOrEqualTo(numContainers);
   }
-
-  public static void testDataNodesStateOnSCM(List<StorageContainerManager> 
scms,
-      int expectedDatanodeCount, HddsProtos.NodeState state) {
-    scms.forEach(scm -> testDataNodesStateOnSCM(scm, expectedDatanodeCount, 
state));
-  }
-
-  /*
-   * Helper function to test DataNode state on the SCM. Note that due to
-   * timing constraints, sometime the node-state can transition to the next
-   * state. This function expects the DataNode to be in NodeState "state" or
-   * "alternateState". Some tests can enforce a unique NodeState test by
-   * setting "alternateState = null".
-   */
-  public static void testDataNodesStateOnSCM(StorageContainerManager scm,
-      int expectedDatanodeCount, HddsProtos.NodeState state) {
-    int countNodes = 0;
-    for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
-      try {
-        HddsProtos.NodeState dnState =
-            scm.getScmNodeManager().getNodeStatus(dn).getHealth();
-        assertSame(state, dnState);
-      } catch (NodeNotFoundException e) {
-        e.printStackTrace();
-        fail("Node not found");
-      }
-      ++countNodes;
-    }
-    assertEquals(expectedDatanodeCount, countNodes);
-  }
 }
diff --git 
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/UniformDatanodesFactory.java
 
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/UniformDatanodesFactory.java
index 22d7dc617bb..c77f087671b 100644
--- 
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/UniformDatanodesFactory.java
+++ 
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/UniformDatanodesFactory.java
@@ -48,7 +48,7 @@
 import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.conf.ConfigurationTarget;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 
 /**
@@ -105,7 +105,7 @@ public OzoneConfiguration apply(OzoneConfiguration conf) 
throws IOException {
     dnConf.set(HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, ratisDir.toString());
 
     if (layoutVersion != null) {
-      DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(
+      DatanodeStorage layoutStorage = new DatanodeStorage(
           dnConf, UUID.randomUUID().toString(), layoutVersion);
       layoutStorage.initialize();
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMVersionManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMVersionManager.java
index 184cbe883e5..99bbab840e9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMVersionManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMVersionManager.java
@@ -50,6 +50,7 @@ public OMVersionManager(OMStorage storage, OzoneManager 
upgradeActionArg) throws
     this(storage, upgradeActionArg, new OMUpgradeActionProvider());
   }
 
+  @VisibleForTesting
   public OMVersionManager(OMStorage storage, OzoneManager upgradeActionArg,
       ComponentUpgradeActionProvider<OmUpgradeAction> upgradeActionProvider) 
throws IOException {
     super(storage, computeApparentVersion(storage.getApparentVersion()), 
OzoneManagerVersion.SOFTWARE_VERSION);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
index 9e89a7780de..2565109a73f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
@@ -124,6 +124,7 @@ protected ComponentVersion expectedSoftwareVersion() {
     return OzoneManagerVersion.SOFTWARE_VERSION;
   }
 
+  @Override
   @Test
   public void testClasspathScanDiscoversUpgradeActions() throws Exception {
     // Regardless of whether OM is finalized, the same set of upgrade actions 
should be loaded.
@@ -140,6 +141,7 @@ public void testClasspathScanDiscoversUpgradeActions() 
throws Exception {
     }
   }
 
+  @Override
   @Test
   public void testFinalizeRunsSuppliedUpgradeAction() throws Exception {
     OmUpgradeAction mockECAction = mock(OmUpgradeAction.class);
@@ -163,6 +165,7 @@ public void testFinalizeRunsSuppliedUpgradeAction() throws 
Exception {
     }
   }
 
+  @Override
   @Test
   public void testUpgradeActionFailureAbortsFinalize() throws Exception {
     ComponentUpgradeActionProvider<OmUpgradeAction> provider = () -> {
@@ -183,6 +186,7 @@ public void testUpgradeActionFailureAbortsFinalize() throws 
Exception {
     }
   }
 
+  @Override
   @Test
   public void testPersistFailureRollsBack() throws Exception {
     // Create a mock storage instance that throws when persisting version 
updates.
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 07dd11d8021..330bd3adbe2 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -65,6 +65,7 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -95,7 +96,6 @@
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TypedTable;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -654,8 +654,7 @@ private void testDatanodeResponse(DatanodeMetadata 
datanodeMetadata)
       fail(String.format("Datanode %s not registered",
           hostname));
     }
-    assertEquals(HDDSLayoutVersionManager.maxLayoutVersion(),
-        datanodeMetadata.getLayoutVersion());
+    assertEquals(HDDSVersion.SOFTWARE_VERSION.serialize(), 
datanodeMetadata.getLayoutVersion());
   }
 
   @Test
diff --git 
a/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
 
b/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
index 797177090a1..5baf6cd14da 100644
--- 
a/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
+++ 
b/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
@@ -69,7 +69,6 @@
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.server.JsonUtils;
 import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -79,8 +78,8 @@
 import org.apache.hadoop.ipc_.ProtobufRpcEngine;
 import org.apache.hadoop.ipc_.RPC;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.DatanodeStorage;
+import org.apache.hadoop.ozone.container.upgrade.DatanodeVersionManager;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
@@ -441,17 +440,16 @@ private void init() throws IOException {
   }
 
   private LayoutVersionProto createLayoutInfo() throws IOException {
-    Storage layoutStorage = new DatanodeLayoutStorage(conf,
+    DatanodeStorage layoutStorage = new DatanodeStorage(conf,
         UUID.randomUUID().toString());
 
-    HDDSLayoutVersionManager layoutVersionManager =
-        new HDDSLayoutVersionManager(layoutStorage.getApparentVersion(), null, 
null);
+    DatanodeVersionManager versionManager = new 
DatanodeVersionManager(layoutStorage, null);
 
     return LayoutVersionProto.newBuilder()
         .setMetadataLayoutVersion(
-            layoutVersionManager.getMetadataLayoutVersion())
+            versionManager.getApparentVersion().serialize())
         .setSoftwareLayoutVersion(
-            layoutVersionManager.getSoftwareLayoutVersion())
+            versionManager.getSoftwareVersion().serialize())
         .build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to