This is an automated email from the ASF dual-hosted git repository.

errose28 pushed a commit to branch HDDS-14496-zdu
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-14496-zdu by this push:
     new cff07d54867 HDDS-15482. Add fencing based on datanode versions to SCM 
and Recon (#10504)
cff07d54867 is described below

commit cff07d548676ab1b617d84bd1bc41e5250d4f374
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Jun 16 14:51:12 2026 -0400

    HDDS-15482. Add fencing based on datanode versions to SCM and Recon (#10504)
    
    Co-authored-by: Cursor <[email protected]>
---
 .../states/endpoint/RegisterEndpointTask.java      |   9 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  54 ++--
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 319 ++++++++-------------
 .../hadoop/hdds/upgrade/TestScmHAFinalization.java |  39 +++
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |  46 ++-
 .../ozone/recon/scm/TestReconNodeManager.java      |  78 +++++
 6 files changed, 323 insertions(+), 222 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 0492f5da79b..cc946694257 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -125,6 +126,14 @@ public EndpointStateMachine.EndPointStates call() throws 
Exception {
         Preconditions.assertEquals(datanodeDetails.getUuidString(), 
response.getDatanodeUUID(), "datanodeID");
         Preconditions.assertTrue(!StringUtils.isBlank(response.getClusterID()),
             "Invalid cluster ID in the response.");
+        if (response.getErrorCode() == errorNodeNotPermitted) {
+          LOG.error("SCM rejected this datanode's registration " +
+              "with software version {} and apparent version {}. " +
+              "This datanode will shut down. Check SCM logs for details and 
verify that the " +
+              "datanode software version is compatible with the cluster. ",
+              versionManager.getSoftwareVersion(), 
versionManager.getApparentVersion());
+          return 
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
+        }
         Preconditions.assertSame(success, response.getErrorCode(), 
"ErrorCode");
         if (response.hasHostname() && response.hasIpAddress()) {
           datanodeDetails.setHostName(response.getHostname());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c9309c6a995..bfd44b0b86f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -728,7 +728,7 @@ public void processNodeReport(DatanodeDetails 
datanodeDetails,
   public void processVersionReport(DatanodeDetails datanodeDetails,
                                    LayoutVersionProto versionReport) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Layout Version report from [datanode={}]",
+      LOG.debug("Processing version report from [datanode={}]",
           datanodeDetails.getHostName());
     }
     if (LOG.isTraceEnabled()) {
@@ -741,7 +741,7 @@ public void processVersionReport(DatanodeDetails 
datanodeDetails,
       nodeStateManager.updateLastKnownVersionInfo(datanodeDetails,
           versionReport);
     } catch (NodeNotFoundException e) {
-      LOG.error("SCM trying to process Layout Version from an " +
+      LOG.error("SCM trying to process version from an " +
           "unregistered node {}.", datanodeDetails);
       return;
     }
@@ -758,6 +758,8 @@ protected void 
sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
     ComponentVersion scmSoftwareVersion = versionManager.getSoftwareVersion();
     ComponentVersion scmApparentVersion = versionManager.getApparentVersion();
 
+    // All versions are validated when Datanodes register after they restart.
+    // After that, their apparent version should only increase, and their 
software version should remain constant.
     if (shouldFenceDatanode(datanodeDetails, dnSoftwareVersion, 
dnApparentVersion)) {
       LOG.error("Invalid datanode in the cluster : {}. " +
               "Datanode software version = {}, " +
@@ -2024,33 +2026,39 @@ protected boolean shouldFenceDatanode(DatanodeDetails 
dnDetails, LayoutVersionPr
     return shouldFenceDatanode(dnDetails, dnSoftwareVersion, 
dnApparentVersion);
   }
 
-  /**
-   * TODO Update this method to fence datanodes based on their software and 
apparent version and log the results.
-   * For now, maintain the non-rolling upgrade requirement that DN and SCM 
must have the same software version.
-   * Datanodes still cannot have a higher apparent version than SCM.
-   */
-  private boolean shouldFenceDatanode(DatanodeDetails dnDetails, 
ComponentVersion softwareVersion,
-                                        ComponentVersion apparentVersion) {
-    // Check datanode software version against SCM.
-    if (!versionManager.getSoftwareVersion().equals(softwareVersion)) {
-      // TODO Once SCM implementation for ZDU is complete, Datanodes with 
lower software versions will be allowed as
-      //  long as SCM is pre-finalized.
-      LOG.error("Datanode {} with software version {} which does not match SCM 
software version {} will not be " +
-              "allowed to join the cluster. This requirement will be lifted 
when ZDU is complete.",
-          dnDetails, softwareVersion, versionManager.getSoftwareVersion());
+  private boolean shouldFenceDatanode(DatanodeDetails dnDetails, 
ComponentVersion dnSoftwareVersion,
+                                        ComponentVersion dnApparentVersion) {
+    ComponentVersion scmSoftwareVersion = versionManager.getSoftwareVersion();
+    ComponentVersion scmApparentVersion = versionManager.getApparentVersion();
+
+    // DN software newer than SCM violates upgrade order. SCM must always be 
upgraded first.
+    if (!dnSoftwareVersion.isSupportedBy(scmSoftwareVersion)) {
+      LOG.error("Datanode {} has software version {} which is newer than SCM 
software version {}. " +
+          "SCM must be upgraded before datanodes.",
+          dnDetails, dnSoftwareVersion, scmSoftwareVersion);
+      return true;
+    }
+
+    // If DN software is older than SCM and SCM is finalized, the old DNs must 
be upgraded before rejoining.
+    if (!scmSoftwareVersion.isSupportedBy(dnSoftwareVersion) && 
!versionManager.needsFinalization()) {
+      LOG.error("Datanode {} has software version {} which is older than SCM 
software version {} and SCM is " +
+              "finalized. Datanode must be upgraded to join the cluster.",
+          dnDetails, dnSoftwareVersion, scmSoftwareVersion);
       return true;
     }
 
-    // Check datanode apparent version against SCM.
-    if (!versionManager.isAllowed(apparentVersion)) {
-      // Datanodes can never have a higher apparent version than SCM.
-      LOG.error("Datanode {} with apparent version {} which is larger than 
SCM's apparent version {} will not be " +
-          "allowed to join the cluster.", dnDetails, apparentVersion, 
versionManager.getApparentVersion());
+    // DN apparent version cannot be higher than SCM apparent version. SCM 
must finalize first.
+    if (!versionManager.isAllowed(dnApparentVersion)) {
+      LOG.error("Datanode {} has apparent version {} which is higher than SCM 
apparent version {}. " +
+          "SCM must finalize before datanodes.",
+          dnDetails, dnApparentVersion, scmApparentVersion);
       return true;
     }
 
-    // Datanodes with lower apparent version than SCM are allowed in the 
cluster but will be instructed to finalize
-    // if SCM has finalized.
+    // Else, either:
+    // DN software is older than SCM but SCM is pre-finalized: expected since 
DNs are upgraded after SCM.
+    // DN software matches SCM and DN apparent version <= SCM apparent 
version: DN can register and will be given a
+    //  finalize command if needed.
     return false;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index eb5ffda37ac..289b5db922a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -47,7 +47,7 @@
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -58,20 +58,17 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.ComponentVersion;
 import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.fs.SpaceUsageSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -93,7 +90,6 @@
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -126,37 +122,16 @@
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test the SCM Node Manager class.
  */
 public class TestSCMNodeManager {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestSCMNodeManager.class);
-
   private File testDir;
   private StorageContainerManager scm;
   private SCMContext scmContext;
 
-  private static final LayoutVersionProto LARGER_SOFTWARE_PROTO = 
LayoutVersionProto.newBuilder()
-      .setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
-      .setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1)
-      .build();
-  private static final LayoutVersionProto SMALLER_APPARENT_VERSION_PROTO =
-      toVersionProto(HDDSLayoutFeature.SCM_HA, HDDSVersion.SOFTWARE_VERSION);
-  // In a real cluster, startup is disallowed if apparent version is larger 
than software version, so
-  // increase both numbers to test smaller software version or larger apparent 
version.
-  private static final LayoutVersionProto SMALLER_ALL_VERSIONS_PROTO =
-      toVersionProto(HDDSLayoutFeature.SCM_HA, HDDSLayoutFeature.SCM_HA);
-  private static final LayoutVersionProto LARGER_ALL_VERSIONS_PROTO = 
LayoutVersionProto.newBuilder()
-          .setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 
1)
-          .setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 
1)
-          .build();
-  private static final LayoutVersionProto MATCHING_VERSION_PROTO = 
defaultVersionProto();
-
   @BeforeEach
   public void setup() {
     testDir = PathUtils.getTestDir(
@@ -308,103 +283,6 @@ private DatanodeDetails 
registerWithCapacity(SCMNodeManager nodeManager,
     return cmd.getDatanode();
   }
 
-  /**
-   * Tests that node manager handles layout versions for newly registered nodes
-   * correctly.
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws TimeoutException
-   */
-  // TODO HDDS-15129 This test will need to be updated to check registration 
conditions depending on SCM's
-  //  finalization state.
-  // @Test
-  public void testScmVersionOnRegister()
-      throws Exception {
-
-    OzoneConfiguration conf = getConf();
-    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
-        1, TimeUnit.DAYS);
-
-    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
-      assertTrue(scm.getScmContext().isLeader());
-      // Nodes with mismatched SLV cannot join the cluster.
-      registerWithCapacity(nodeManager,
-          LARGER_SOFTWARE_PROTO, errorNodeNotPermitted);
-      registerWithCapacity(nodeManager,
-          SMALLER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
-      registerWithCapacity(nodeManager,
-          LARGER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
-      // Nodes with mismatched MLV can join
-      DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager,
-          SMALLER_APPARENT_VERSION_PROTO, success);
-      DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager,
-          SMALLER_APPARENT_VERSION_PROTO, success);
-      // This node has correct MLV and SLV
-      DatanodeDetails goodNode = registerWithCapacity(nodeManager,
-          MATCHING_VERSION_PROTO, success);
-
-      assertEquals(3, nodeManager.getAllNodes().size());
-
-      scm.exitSafeMode();
-
-      assertPipelines(HddsProtos.ReplicationFactor.ONE,
-          count -> count == 3,
-          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
-      assertPipelines(HddsProtos.ReplicationFactor.THREE,
-          count -> count >= 1,
-          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
-    }
-  }
-
-  private void assertPipelines(HddsProtos.ReplicationFactor factor,
-      Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs)
-      throws Exception {
-
-    Set<String> allowedDnIds = allowedDNs.stream()
-        .map(DatanodeDetails::getUuidString)
-        .collect(Collectors.toSet());
-
-    RatisReplicationConfig replConfig = RatisReplicationConfig
-        .getInstance(factor);
-
-    // Wait for the expected number of pipelines using allowed DNs.
-    GenericTestUtils.waitFor(() -> {
-      // Closed pipelines are no longer in operation so we should not count
-      // them. We cannot check for open pipelines only because this is a mock
-      // test so the pipelines may remain in ALLOCATED state.
-      List<Pipeline> pipelines = scm.getPipelineManager()
-          .getPipelines(replConfig)
-          .stream()
-          .filter(p -> p.getPipelineState() != Pipeline.PipelineState.CLOSED)
-          .collect(Collectors.toList());
-      LOG.info("Found {} non-closed pipelines of type {} and factor {}.",
-          pipelines.size(),
-          replConfig.getReplicationType(), replConfig.getReplicationFactor());
-      boolean success = countCheck.test(pipelines.size());
-
-      // If we have the correct number of pipelines, make sure that none of
-      // these pipelines use nodes outside of allowedDNs.
-      if (success) {
-        for (Pipeline pipeline: pipelines) {
-          for (DatanodeDetails pipelineDN: pipeline.getNodes()) {
-            // Do not wait for this condition to be true. Disallowed DNs should
-            // never be used once we have the expected number of pipelines.
-            if (!allowedDnIds.contains(pipelineDN.getUuidString())) {
-              String message = String.format("Pipeline %s used datanode %s " +
-                      "which is not in the set of allowed datanodes: %s",
-                  pipeline.getId().toString(), pipelineDN.getUuidString(),
-                  allowedDnIds);
-              fail(message);
-            }
-          }
-        }
-      }
-
-      return success;
-    }, 1000, 10000);
-  }
-
   /**
    * asserts that if we send no heartbeats node manager stays in safemode.
    *
@@ -738,13 +616,6 @@ void testScmHandleJvmPause() throws Exception {
     }
   }
 
-  @Test
-  public void testProcessVersionReports() throws IOException {
-    testProcessVersionReportLowerApparentVersion(true);
-    testProcessVersionReportLowerApparentVersion(false);
-    testProcessVersionReportHigherApparentVersion();
-  }
-
   @Test
   public void testDatanodeFinalizedCounterTracksVersionReports()
       throws IOException, AuthenticationException {
@@ -783,13 +654,16 @@ public void 
testDatanodeFinalizedCounterTracksRegistrationAndRemoveNode()
       throws IOException, AuthenticationException, NodeNotFoundException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       DatanodeDetails finalizedNode =
-          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+          registerWithCapacity(nodeManager, defaultVersionProto(), success);
       assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
               .getNumFinalizedDatanodes(),
           "Finalized registration should increment finalized count");
 
+
+      LayoutVersionProto preFinalizedVersionProto =
+          toVersionProto(HDDSLayoutFeature.SCM_HA, 
HDDSVersion.SOFTWARE_VERSION);
       DatanodeDetails nonFinalizedNode =
-          registerWithCapacity(nodeManager, SMALLER_APPARENT_VERSION_PROTO, 
success);
+          registerWithCapacity(nodeManager, preFinalizedVersionProto, success);
       assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
               .getNumFinalizedDatanodes(),
           "Non-finalized registration should not increment finalized count");
@@ -816,6 +690,88 @@ private static Stream<Arguments> ineligibleHealthStates() {
     );
   }
 
+  private static Stream<Arguments> scmDatanodeVersionCombinations() {
+    // Software version of SCM is always fixed at the latest. All other 
versions are relative to this.
+    return Stream.of(
+        /* SCM PRE-FINALIZED */
+
+        // Old DN accepted
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSLayoutFeature.INITIAL_VERSION),
+            success, false),
+        // Pre-finalized DN accepted
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSVersion.SOFTWARE_VERSION),
+            success, false),
+        // Finalized DN rejected
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            defaultVersionProto(),
+            errorNodeNotPermitted, false),
+        // Newer DN rejected, even though its apparent version matches SCM.
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            LayoutVersionProto.newBuilder()
+                
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
+                
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+            errorNodeNotPermitted, false),
+
+        /* SCM FINALIZED */
+
+        // Old DN rejected
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSLayoutFeature.INITIAL_VERSION),
+            errorNodeNotPermitted, false),
+        // Pre-finalized DN accepted but instructed to finalize
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSVersion.SOFTWARE_VERSION),
+            success, true),
+        // Finalized DN accepted
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            defaultVersionProto(),
+            success, false),
+        // Newer DN rejected, even though its apparent version matches SCM.
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            LayoutVersionProto.newBuilder()
+                
.setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+                
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+            errorNodeNotPermitted, false)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("scmDatanodeVersionCombinations")
+  public void testDatanodeFencingOnRegister(ComponentVersion scmApparent, 
LayoutVersionProto dnVersionProto,
+      ErrorCode expectedResult, boolean expectFinalizeCmd) throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
+    when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
+    EventPublisher eventPublisher = mock(EventPublisher.class);
+    ScmVersionManager versionManager = mockVersionManager(scmApparent);
+
+    try (SCMNodeManager nodeManager = new SCMNodeManager(conf, 
scmStorageConfig, eventPublisher,
+        new NetworkTopologyImpl(conf), SCMContext.emptyContext(), 
versionManager)) {
+      DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails();
+      StorageReportProto storageReport = HddsTestUtils.createStorageReport(
+          node.getID(), node.getNetworkFullPath(), Long.MAX_VALUE);
+      RegisteredCommand cmd = nodeManager.register(node,
+          
HddsTestUtils.createNodeReport(Collections.singletonList(storageReport), 
emptyList()),
+          getRandomPipelineReports(), dnVersionProto);
+
+      assertEquals(expectedResult, cmd.getError());
+
+      if (expectedResult == success) {
+        ArgumentCaptor<CommandForDatanode> captor = 
ArgumentCaptor.forClass(CommandForDatanode.class);
+        nodeManager.processVersionReport(node, dnVersionProto);
+        if (expectFinalizeCmd) {
+          verify(eventPublisher, times(1)).fireEvent(eq(DATANODE_COMMAND), 
captor.capture());
+          assertEquals(node.getID(), captor.getValue().getDatanodeId());
+          assertEquals(finalizeNewLayoutVersionCommand, 
captor.getValue().getCommand().getType());
+        } else {
+          verify(eventPublisher, times(0)).fireEvent(eq(DATANODE_COMMAND), 
captor.capture());
+        }
+      }
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("ineligibleHealthStates")
   public void testDatanodeFinalizedCounterExcludesNonHealthyNodes(NodeStatus 
expectedStatus)
@@ -829,10 +785,10 @@ public void 
testDatanodeFinalizedCounterExcludesNonHealthyNodes(NodeStatus expec
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       // transitionNode stops heartbeating and will become STALE or DEAD
       DatanodeDetails transitionNode =
-          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+          registerWithCapacity(nodeManager, defaultVersionProto(), success);
       // heartbeatingNode keeps heartbeating as a healthy baseline
       DatanodeDetails heartbeatingNode =
-          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+          registerWithCapacity(nodeManager, defaultVersionProto(), success);
 
       nodeManager.processHeartbeat(transitionNode);
       nodeManager.processHeartbeat(heartbeatingNode);
@@ -873,7 +829,7 @@ public void 
testDatanodeFinalizedCounterIncludesAllHealthyOpStates(
       throws IOException, AuthenticationException, NodeNotFoundException {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       DatanodeDetails node =
-          registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+          registerWithCapacity(nodeManager, defaultVersionProto(), success);
       nodeManager.setNodeOperationalState(node, opState);
 
       // All HEALTHY nodes should be counted regardless of operational state
@@ -884,81 +840,52 @@ public void 
testDatanodeFinalizedCounterIncludesAllHealthyOpStates(
     }
   }
 
-  // Currently invoked by testProcessLayoutVersion.
+  /**
+   * A datanode's software version can only increase with a restart, but it's 
apparent version can change while it is
+   * running and registered to SCM. This is not supposed to happen until SCM's 
apparent version has changed and it
+   * instructs a datanode to finalize, but this test covers a defensive check 
that SCM will not send commands to a
+   * datanode that somehow ends up with a higher apparent version.
+   *
+   * There is currently no way for SCM to remove a Datanode from the cluster 
after it has registed, so we depend
+   * on the datanode registration checks that run after each datanode restart 
to catch invalid versions and fence
+   * datanodes.
+   */
+  @Test
   public void testProcessVersionReportHigherApparentVersion()
       throws IOException {
-    final int healthCheckInterval = 200; // milliseconds
-    final int heartbeatInterval = 1; // seconds
-
     OzoneConfiguration conf = getConf();
-    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
-        healthCheckInterval, MILLISECONDS);
-    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
-        heartbeatInterval, SECONDS);
 
     SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
     when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
     EventPublisher eventPublisher = mock(EventPublisher.class);
-    ScmVersionManager versionManager = mockVersionManager();
+    // SCM is pre-finalized: software version is current, apparent version is 
older.
+    ScmVersionManager versionManager = 
mockVersionManager(HDDSLayoutFeature.INITIAL_VERSION);
 
     SCMContext nodeManagerContext = SCMContext.emptyContext();
-    SCMNodeManager nodeManager  = new SCMNodeManager(conf,
+    SCMNodeManager nodeManager = new SCMNodeManager(conf,
         scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
         nodeManagerContext, versionManager);
 
-    // Datanodes should never have higher apparent version than SCM.
-    DatanodeDetails node1 =
-        HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
+    // Register a pre-finalized datanode to the pre-finalized SCM.
+    DatanodeDetails node1 = MockDatanodeDetails.randomDatanodeDetails();
+    StorageReportProto storageReport = HddsTestUtils.createStorageReport(
+        node1.getID(), node1.getNetworkFullPath(), Long.MAX_VALUE);
+    LayoutVersionProto preFinalizedDNVersion =
+        toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSVersion.SOFTWARE_VERSION);
+    nodeManager.register(node1,
+        
HddsTestUtils.createNodeReport(Collections.singletonList(storageReport), 
emptyList()),
+        getRandomPipelineReports(),
+        preFinalizedDNVersion);
+
+    // DN somehow incorrectly finalizes before SCM. SCM should log an error 
but it will not take action.
+    // There is currently no way for SCM to kick out a datanode after it has 
registered.
     LogCapturer logCapturer = LogCapturer.captureLogs(SCMNodeManager.class);
-    nodeManager.processVersionReport(node1, LARGER_ALL_VERSIONS_PROTO);
-    assertThat(logCapturer.getOutput()).contains("will not be allowed to join 
the cluster");
+    nodeManager.processVersionReport(node1, defaultVersionProto());
+    assertThat(logCapturer.getOutput()).contains("SCM must finalize before 
datanodes.");
+    verify(eventPublisher, times(0)).fireEvent(eq(DATANODE_COMMAND), any());
     nodeManager.close();
   }
 
-  // Currently invoked by testProcessLayoutVersion.
-  public void testProcessVersionReportLowerApparentVersion(boolean 
withScmFinalized) {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
-    when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
-    EventPublisher eventPublisher = mock(EventPublisher.class);
-
-    ScmVersionManager versionManager;
-    if (withScmFinalized) {
-      versionManager = mockVersionManager();
-    } else {
-      // Use an apparent version for SCM that is in between SCM's software 
version and the datanode's apparent version.
-      versionManager = mockVersionManager(HDDSLayoutFeature.SCM_HA);
-    }
-
-    SCMContext nodeManagerContext = SCMContext.emptyContext();
-    SCMNodeManager nodeManager  = new SCMNodeManager(conf,
-        scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
-        nodeManagerContext, versionManager);
-    DatanodeDetails node1 =
-        HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
-    verify(eventPublisher,
-        times(1)).fireEvent(NEW_NODE, node1);
-    nodeManager.processVersionReport(node1,
-        LayoutVersionProto.newBuilder()
-            
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
-            
.setSoftwareLayoutVersion(versionManager.getSoftwareVersion().serialize())
-            .build());
-    ArgumentCaptor<CommandForDatanode> captor =
-        ArgumentCaptor.forClass(CommandForDatanode.class);
-
-    // SCM will only tell datanodes to finalize after it has finalized.
-    if (withScmFinalized) {
-      verify(eventPublisher, times(1))
-          .fireEvent(eq(DATANODE_COMMAND), captor.capture());
-      assertEquals(captor.getValue().getDatanodeId(), node1.getID());
-      assertEquals(finalizeNewLayoutVersionCommand,
-          captor.getValue().getCommand().getType());
-    } else {
-      verify(eventPublisher, times(0))
-          .fireEvent(eq(DATANODE_COMMAND), captor.capture());
-    }
-  }
-
   @Test
   public void testProcessCommandQueueReport()
       throws IOException, NodeNotFoundException, AuthenticationException {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index 3c152f1dd15..eff92e27589 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -32,10 +32,14 @@
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.UniformDatanodesFactory;
+import 
org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
 import org.apache.hadoop.ozone.upgrade.RatisBasedVersionManager;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -86,6 +90,41 @@ public void shutdown() {
     }
   }
 
+  @Test
+  public void testFinalizedDatanodesShutDownWithPrefinalizedScm() throws 
Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(SCMStorageConfig.TESTING_INIT_APPARENT_VERSION_KEY, 
HDDSLayoutFeature.INITIAL_VERSION.serialize());
+    
conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SERVER_RPC_FIRST_ELECTION_TIMEOUT, 
"5s");
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, "1s");
+
+    MiniOzoneHAClusterImpl.Builder clusterBuilder = 
MiniOzoneCluster.newHABuilder(conf);
+    clusterBuilder.setNumOfStorageContainerManagers(NUM_SCMS)
+        .setNumOfActiveSCMs(NUM_SCMS)
+        .setSCMServiceId("scmservice")
+        .setNumOfOzoneManagers(1)
+        .setNumDatanodes(NUM_DATANODES)
+        .setDatanodeFactory(UniformDatanodesFactory.newBuilder()
+            .setApparentVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+            .build());
+
+    // Prevent terminateDatanode() from calling System.exit(1) and killing the 
test JVM.
+    ExitUtil.disableSystemExit();
+    LogCapturer logCapture = 
LogCapturer.captureLogs(RegisterEndpointTask.class);
+    // This starts the mini ozone cluster.
+    cluster = clusterBuilder.build();
+
+    // isStopped cannot be set to true unless a datanode was started first.
+    // Each datanode should be rejected since its apparent version exceeds the 
pre-finalized SCM's.
+    GenericTestUtils.waitFor(
+        () -> 
cluster.getHddsDatanodes().stream().allMatch(HddsDatanodeService::isStopped),
+        500, 30_000);
+
+    assertThat(logCapture.getOutput()).contains("SCM rejected this datanode's 
registration");
+    for (StorageContainerManager scm : 
cluster.getStorageContainerManagersList()) {
+      assertThat(scm.getScmNodeManager().getAllNodes()).isEmpty();
+    }
+  }
+
   @Test
   public void testFinalization() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index 0eacc90936b..143dcef2f0a 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hdds.ComponentVersion;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -50,6 +51,7 @@
 import org.apache.hadoop.hdds.scm.server.upgrade.ScmVersionManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.upgrade.HDDSVersionUtils;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -84,6 +86,7 @@ public class ReconNodeManager extends SCMNodeManager {
   private Map<DatanodeID, Long> datanodeHeartbeatMap = new HashMap<>();
 
   private final long reconDatanodeOutdatedTime;
+  private final ScmVersionManager versionManager;
 
   public ReconNodeManager(OzoneConfiguration conf,
                           SCMStorageConfig scmStorageConfig,
@@ -93,6 +96,7 @@ public ReconNodeManager(OzoneConfiguration conf,
                           ScmVersionManager versionManager) {
     super(conf, scmStorageConfig, eventPublisher, networkTopology,
         SCMContext.emptyContext(), versionManager);
+    this.versionManager = versionManager;
     final int reconStaleDatanodeMultiplier = 3;
     this.reconDatanodeOutdatedTime = reconStaleDatanodeMultiplier *
         HddsServerUtil.getReconHeartbeatInterval(conf);
@@ -294,8 +298,44 @@ public void removeNode(DatanodeDetails datanodeDetails) 
throws NodeNotFoundExcep
   @Override
   protected void sendFinalizeToDatanodeIfNeeded(DatanodeDetails 
datanodeDetails,
       LayoutVersionProto versionReport) {
-    // Recon will not send commands to datanodes, but it should still log if a 
datanode with an invalid version is
-    // heartbeating.
-    shouldFenceDatanode(datanodeDetails, versionReport);
+    // Recon will not send finalize commands to datanodes.
+  }
+
+
+  /**
+   * Called by the parent class to when a datanode registers to determine 
whether it should be allowed in the cluster.
+   * Since Recon finalizes automatically on startup, it should not reject 
datanodes with lower software version after
+   * it has finalized, unlike SCM.
+   */
+  @Override
+  protected boolean shouldFenceDatanode(DatanodeDetails dnDetails, 
LayoutVersionProto versionReport) {
+    ComponentVersion dnSoftwareVersion = 
HDDSVersionUtils.deserializeHDDSVersionOrLayoutVersion(
+        versionReport.getSoftwareLayoutVersion());
+    ComponentVersion dnApparentVersion = 
HDDSVersionUtils.deserializeHDDSVersionOrLayoutVersion(
+        versionReport.getMetadataLayoutVersion());
+    ComponentVersion reconSoftwareVersion = 
versionManager.getSoftwareVersion();
+    ComponentVersion reconApparentVersion = 
versionManager.getApparentVersion();
+
+    // DN software newer than Recon violates upgrade order. Recon must always 
be upgraded first.
+    if (!dnSoftwareVersion.isSupportedBy(reconSoftwareVersion)) {
+      LOG.error("Datanode {} has software version {} which is newer than Recon 
software version {}. " +
+              "Recon must be upgraded before datanodes.",
+          dnDetails, dnSoftwareVersion, reconSoftwareVersion);
+      return true;
+    }
+
+    // DN apparent version cannot be higher than Recon apparent version. Recon 
must finalize first.
+    if (!versionManager.isAllowed(dnApparentVersion)) {
+      LOG.error("Datanode {} has apparent version {} which is higher than 
Recon apparent version {}. " +
+              "Recon must finalize before datanodes.",
+          dnDetails, dnApparentVersion, reconApparentVersion);
+      return true;
+    }
+
+    // Else, either:
+    // DN software is older than Recon: expected since DNs are upgraded after 
Recon.
+    // DN software matches Recon and DN apparent version <= Recon apparent 
version: DN can register and SCM should
+    // instruct it to finalize if needed.
+    return false;
   }
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
index c1315cee114..d8b761c501a 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
@@ -20,8 +20,13 @@
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static 
org.apache.hadoop.hdds.scm.upgrade.ScmUpgradeTestUtils.mockVersionManager;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static 
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultVersionProto;
+import static 
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toVersionProto;
 import static org.apache.ratis.util.Preconditions.assertTrue;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,18 +42,24 @@
 import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.ComponentVersion;
+import org.apache.hadoop.hdds.HDDSVersion;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.ScmVersionManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -62,6 +73,9 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Tests for Recon Node Manager.
@@ -237,6 +251,70 @@ public void testUpdateNodeOperationalStateFromScm() throws 
Exception {
     assertEquals(datanodeDetails.getUuid(), nodes.get(0).getUuid());
   }
 
+  private static Stream<Arguments> reconDatanodeVersionCombinations() {
+    // Recon accepts old-software DNs even when finalized (unlike SCM), since 
Recon finalizes on startup.
+    return Stream.of(
+        /* RECON PRE-FINALIZED */
+
+        // Old DN accepted
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSLayoutFeature.INITIAL_VERSION),
+            success),
+        // Pre-finalized DN accepted
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSVersion.SOFTWARE_VERSION),
+            success),
+        // Finalized DN rejected (apparent > Recon apparent)
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            defaultVersionProto(),
+            errorNodeNotPermitted),
+        // Newer SW DN rejected
+        Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+            LayoutVersionProto.newBuilder()
+                
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
+                
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+            errorNodeNotPermitted),
+
+        /* RECON FINALIZED */
+
+        // Old DN accepted (Recon does not reject old-SW DNs after 
finalization, unlike SCM)
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSLayoutFeature.INITIAL_VERSION),
+            success),
+        // Pre-finalized DN accepted
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            toVersionProto(HDDSLayoutFeature.INITIAL_VERSION, 
HDDSVersion.SOFTWARE_VERSION),
+            success),
+        // Finalized DN accepted
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            defaultVersionProto(),
+            success),
+        // Newer SW DN rejected
+        Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+            LayoutVersionProto.newBuilder()
+                
.setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+                
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+            errorNodeNotPermitted)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("reconDatanodeVersionCombinations")
+  public void testDatanodeFencingOnRegister(ComponentVersion reconApparent,
+      LayoutVersionProto dnVersionProto, ErrorCode expectedResult) throws 
IOException {
+    ReconStorageConfig scmStorageConfig = new ReconStorageConfig(conf, new 
ReconUtils());
+    EventQueue eventQueue = new EventQueue();
+    NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
+    final Table<DatanodeID, DatanodeDetails> nodeTable = 
ReconSCMDBDefinition.NODES.getTable(store);
+    ScmVersionManager reconVersionManager = mockVersionManager(reconApparent);
+    try (ReconNodeManager reconNodeManager = new ReconNodeManager(conf,
+        scmStorageConfig, eventQueue, clusterMap, nodeTable, 
reconVersionManager, reconContext)) {
+      DatanodeDetails datanodeDetails = randomDatanodeDetails();
+      RegisteredCommand cmd = reconNodeManager.register(datanodeDetails, null, 
null, dnVersionProto);
+      assertEquals(expectedResult, cmd.getError());
+    }
+  }
+
   @Test
   public void testDatanodeUpdate() throws IOException {
     ReconStorageConfig scmStorageConfig =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to