This is an automated email from the ASF dual-hosted git repository.
errose28 pushed a commit to branch HDDS-14496-zdu
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-14496-zdu by this push:
new cff07d54867 HDDS-15482. Add fencing based on datanode versions to SCM
and Recon (#10504)
cff07d54867 is described below
commit cff07d548676ab1b617d84bd1bc41e5250d4f374
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Jun 16 14:51:12 2026 -0400
HDDS-15482. Add fencing based on datanode versions to SCM and Recon (#10504)
Co-authored-by: Cursor <[email protected]>
---
.../states/endpoint/RegisterEndpointTask.java | 9 +
.../hadoop/hdds/scm/node/SCMNodeManager.java | 54 ++--
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 319 ++++++++-------------
.../hadoop/hdds/upgrade/TestScmHAFinalization.java | 39 +++
.../hadoop/ozone/recon/scm/ReconNodeManager.java | 46 ++-
.../ozone/recon/scm/TestReconNodeManager.java | 78 +++++
6 files changed, 323 insertions(+), 222 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 0492f5da79b..cc946694257 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.common.states.endpoint;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;
import com.google.common.annotations.VisibleForTesting;
@@ -125,6 +126,14 @@ public EndpointStateMachine.EndPointStates call() throws
Exception {
Preconditions.assertEquals(datanodeDetails.getUuidString(),
response.getDatanodeUUID(), "datanodeID");
Preconditions.assertTrue(!StringUtils.isBlank(response.getClusterID()),
"Invalid cluster ID in the response.");
+ if (response.getErrorCode() == errorNodeNotPermitted) {
+ LOG.error("SCM rejected this datanode's registration " +
+ "with software version {} and apparent version {}. " +
+ "This datanode will shut down. Check SCM logs for details and
verify that the " +
+ "datanode software version is compatible with the cluster. ",
+ versionManager.getSoftwareVersion(),
versionManager.getApparentVersion());
+ return
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
+ }
Preconditions.assertSame(success, response.getErrorCode(),
"ErrorCode");
if (response.hasHostname() && response.hasIpAddress()) {
datanodeDetails.setHostName(response.getHostname());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c9309c6a995..bfd44b0b86f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -728,7 +728,7 @@ public void processNodeReport(DatanodeDetails
datanodeDetails,
public void processVersionReport(DatanodeDetails datanodeDetails,
LayoutVersionProto versionReport) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing Layout Version report from [datanode={}]",
+ LOG.debug("Processing version report from [datanode={}]",
datanodeDetails.getHostName());
}
if (LOG.isTraceEnabled()) {
@@ -741,7 +741,7 @@ public void processVersionReport(DatanodeDetails
datanodeDetails,
nodeStateManager.updateLastKnownVersionInfo(datanodeDetails,
versionReport);
} catch (NodeNotFoundException e) {
- LOG.error("SCM trying to process Layout Version from an " +
+ LOG.error("SCM trying to process version from an " +
"unregistered node {}.", datanodeDetails);
return;
}
@@ -758,6 +758,8 @@ protected void
sendFinalizeToDatanodeIfNeeded(DatanodeDetails datanodeDetails,
ComponentVersion scmSoftwareVersion = versionManager.getSoftwareVersion();
ComponentVersion scmApparentVersion = versionManager.getApparentVersion();
+ // All versions are validated when Datanodes register after they restart.
+ // After that, their apparent version should only increase, and their
software version should remain constant.
if (shouldFenceDatanode(datanodeDetails, dnSoftwareVersion,
dnApparentVersion)) {
LOG.error("Invalid datanode in the cluster : {}. " +
"Datanode software version = {}, " +
@@ -2024,33 +2026,39 @@ protected boolean shouldFenceDatanode(DatanodeDetails
dnDetails, LayoutVersionPr
return shouldFenceDatanode(dnDetails, dnSoftwareVersion,
dnApparentVersion);
}
- /**
- * TODO Update this method to fence datanodes based on their software and
apparent version and log the results.
- * For now, maintain the non-rolling upgrade requirement that DN and SCM
must have the same software version.
- * Datanodes still cannot have a higher apparent version than SCM.
- */
- private boolean shouldFenceDatanode(DatanodeDetails dnDetails,
ComponentVersion softwareVersion,
- ComponentVersion apparentVersion) {
- // Check datanode software version against SCM.
- if (!versionManager.getSoftwareVersion().equals(softwareVersion)) {
- // TODO Once SCM implementation for ZDU is complete, Datanodes with
lower software versions will be allowed as
- // long as SCM is pre-finalized.
- LOG.error("Datanode {} with software version {} which does not match SCM
software version {} will not be " +
- "allowed to join the cluster. This requirement will be lifted
when ZDU is complete.",
- dnDetails, softwareVersion, versionManager.getSoftwareVersion());
+ private boolean shouldFenceDatanode(DatanodeDetails dnDetails,
ComponentVersion dnSoftwareVersion,
+ ComponentVersion dnApparentVersion) {
+ ComponentVersion scmSoftwareVersion = versionManager.getSoftwareVersion();
+ ComponentVersion scmApparentVersion = versionManager.getApparentVersion();
+
+ // DN software newer than SCM violates upgrade order. SCM must always be
upgraded first.
+ if (!dnSoftwareVersion.isSupportedBy(scmSoftwareVersion)) {
+ LOG.error("Datanode {} has software version {} which is newer than SCM
software version {}. " +
+ "SCM must be upgraded before datanodes.",
+ dnDetails, dnSoftwareVersion, scmSoftwareVersion);
+ return true;
+ }
+
+ // If DN software is older than SCM and SCM is finalized, the old DNs must
be upgraded before rejoining.
+ if (!scmSoftwareVersion.isSupportedBy(dnSoftwareVersion) &&
!versionManager.needsFinalization()) {
+ LOG.error("Datanode {} has software version {} which is older than SCM
software version {} and SCM is " +
+ "finalized. Datanode must be upgraded to join the cluster.",
+ dnDetails, dnSoftwareVersion, scmSoftwareVersion);
return true;
}
- // Check datanode apparent version against SCM.
- if (!versionManager.isAllowed(apparentVersion)) {
- // Datanodes can never have a higher apparent version than SCM.
- LOG.error("Datanode {} with apparent version {} which is larger than
SCM's apparent version {} will not be " +
- "allowed to join the cluster.", dnDetails, apparentVersion,
versionManager.getApparentVersion());
+ // DN apparent version cannot be higher than SCM apparent version. SCM
must finalize first.
+ if (!versionManager.isAllowed(dnApparentVersion)) {
+ LOG.error("Datanode {} has apparent version {} which is higher than SCM
apparent version {}. " +
+ "SCM must finalize before datanodes.",
+ dnDetails, dnApparentVersion, scmApparentVersion);
return true;
}
- // Datanodes with lower apparent version than SCM are allowed in the
cluster but will be instructed to finalize
- // if SCM has finalized.
+ // Else, either:
+ // DN software is older than SCM but SCM is pre-finalized: expected since
DNs are upgraded after SCM.
+ // DN software matches SCM and DN apparent version <= SCM apparent
version: DN can register and will be given a
+ // finalize command if needed.
return false;
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index eb5ffda37ac..289b5db922a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -47,7 +47,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -58,20 +58,17 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.ComponentVersion;
import org.apache.hadoop.hdds.HDDSVersion;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -93,7 +90,6 @@
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -126,37 +122,16 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Test the SCM Node Manager class.
*/
public class TestSCMNodeManager {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestSCMNodeManager.class);
-
private File testDir;
private StorageContainerManager scm;
private SCMContext scmContext;
- private static final LayoutVersionProto LARGER_SOFTWARE_PROTO =
LayoutVersionProto.newBuilder()
- .setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
- .setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1)
- .build();
- private static final LayoutVersionProto SMALLER_APPARENT_VERSION_PROTO =
- toVersionProto(HDDSLayoutFeature.SCM_HA, HDDSVersion.SOFTWARE_VERSION);
- // In a real cluster, startup is disallowed if apparent version is larger
than software version, so
- // increase both numbers to test smaller software version or larger apparent
version.
- private static final LayoutVersionProto SMALLER_ALL_VERSIONS_PROTO =
- toVersionProto(HDDSLayoutFeature.SCM_HA, HDDSLayoutFeature.SCM_HA);
- private static final LayoutVersionProto LARGER_ALL_VERSIONS_PROTO =
LayoutVersionProto.newBuilder()
- .setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() +
1)
- .setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() +
1)
- .build();
- private static final LayoutVersionProto MATCHING_VERSION_PROTO =
defaultVersionProto();
-
@BeforeEach
public void setup() {
testDir = PathUtils.getTestDir(
@@ -308,103 +283,6 @@ private DatanodeDetails
registerWithCapacity(SCMNodeManager nodeManager,
return cmd.getDatanode();
}
- /**
- * Tests that node manager handles layout versions for newly registered nodes
- * correctly.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws TimeoutException
- */
- // TODO HDDS-15129 This test will need to be updated to check registration
conditions depending on SCM's
- // finalization state.
- // @Test
- public void testScmVersionOnRegister()
- throws Exception {
-
- OzoneConfiguration conf = getConf();
- conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
- 1, TimeUnit.DAYS);
-
- try (SCMNodeManager nodeManager = createNodeManager(conf)) {
- assertTrue(scm.getScmContext().isLeader());
- // Nodes with mismatched SLV cannot join the cluster.
- registerWithCapacity(nodeManager,
- LARGER_SOFTWARE_PROTO, errorNodeNotPermitted);
- registerWithCapacity(nodeManager,
- SMALLER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
- registerWithCapacity(nodeManager,
- LARGER_ALL_VERSIONS_PROTO, errorNodeNotPermitted);
- // Nodes with mismatched MLV can join
- DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager,
- SMALLER_APPARENT_VERSION_PROTO, success);
- DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager,
- SMALLER_APPARENT_VERSION_PROTO, success);
- // This node has correct MLV and SLV
- DatanodeDetails goodNode = registerWithCapacity(nodeManager,
- MATCHING_VERSION_PROTO, success);
-
- assertEquals(3, nodeManager.getAllNodes().size());
-
- scm.exitSafeMode();
-
- assertPipelines(HddsProtos.ReplicationFactor.ONE,
- count -> count == 3,
- Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
- assertPipelines(HddsProtos.ReplicationFactor.THREE,
- count -> count >= 1,
- Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
- }
- }
-
- private void assertPipelines(HddsProtos.ReplicationFactor factor,
- Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs)
- throws Exception {
-
- Set<String> allowedDnIds = allowedDNs.stream()
- .map(DatanodeDetails::getUuidString)
- .collect(Collectors.toSet());
-
- RatisReplicationConfig replConfig = RatisReplicationConfig
- .getInstance(factor);
-
- // Wait for the expected number of pipelines using allowed DNs.
- GenericTestUtils.waitFor(() -> {
- // Closed pipelines are no longer in operation so we should not count
- // them. We cannot check for open pipelines only because this is a mock
- // test so the pipelines may remain in ALLOCATED state.
- List<Pipeline> pipelines = scm.getPipelineManager()
- .getPipelines(replConfig)
- .stream()
- .filter(p -> p.getPipelineState() != Pipeline.PipelineState.CLOSED)
- .collect(Collectors.toList());
- LOG.info("Found {} non-closed pipelines of type {} and factor {}.",
- pipelines.size(),
- replConfig.getReplicationType(), replConfig.getReplicationFactor());
- boolean success = countCheck.test(pipelines.size());
-
- // If we have the correct number of pipelines, make sure that none of
- // these pipelines use nodes outside of allowedDNs.
- if (success) {
- for (Pipeline pipeline: pipelines) {
- for (DatanodeDetails pipelineDN: pipeline.getNodes()) {
- // Do not wait for this condition to be true. Disallowed DNs should
- // never be used once we have the expected number of pipelines.
- if (!allowedDnIds.contains(pipelineDN.getUuidString())) {
- String message = String.format("Pipeline %s used datanode %s " +
- "which is not in the set of allowed datanodes: %s",
- pipeline.getId().toString(), pipelineDN.getUuidString(),
- allowedDnIds);
- fail(message);
- }
- }
- }
- }
-
- return success;
- }, 1000, 10000);
- }
-
/**
* asserts that if we send no heartbeats node manager stays in safemode.
*
@@ -738,13 +616,6 @@ void testScmHandleJvmPause() throws Exception {
}
}
- @Test
- public void testProcessVersionReports() throws IOException {
- testProcessVersionReportLowerApparentVersion(true);
- testProcessVersionReportLowerApparentVersion(false);
- testProcessVersionReportHigherApparentVersion();
- }
-
@Test
public void testDatanodeFinalizedCounterTracksVersionReports()
throws IOException, AuthenticationException {
@@ -783,13 +654,16 @@ public void
testDatanodeFinalizedCounterTracksRegistrationAndRemoveNode()
throws IOException, AuthenticationException, NodeNotFoundException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
DatanodeDetails finalizedNode =
- registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+ registerWithCapacity(nodeManager, defaultVersionProto(), success);
assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
.getNumFinalizedDatanodes(),
"Finalized registration should increment finalized count");
+
+ LayoutVersionProto preFinalizedVersionProto =
+ toVersionProto(HDDSLayoutFeature.SCM_HA,
HDDSVersion.SOFTWARE_VERSION);
DatanodeDetails nonFinalizedNode =
- registerWithCapacity(nodeManager, SMALLER_APPARENT_VERSION_PROTO,
success);
+ registerWithCapacity(nodeManager, preFinalizedVersionProto, success);
assertEquals(1, nodeManager.getDatanodeFinalizationCounts()
.getNumFinalizedDatanodes(),
"Non-finalized registration should not increment finalized count");
@@ -816,6 +690,88 @@ private static Stream<Arguments> ineligibleHealthStates() {
);
}
+ private static Stream<Arguments> scmDatanodeVersionCombinations() {
+ // Software version of SCM is always fixed at the latest. All other
versions are relative to this.
+ return Stream.of(
+ /* SCM PRE-FINALIZED */
+
+ // Old DN accepted
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSLayoutFeature.INITIAL_VERSION),
+ success, false),
+ // Pre-finalized DN accepted
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSVersion.SOFTWARE_VERSION),
+ success, false),
+ // Finalized DN rejected
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ defaultVersionProto(),
+ errorNodeNotPermitted, false),
+ // Newer DN rejected, even though its apparent version matches SCM.
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ LayoutVersionProto.newBuilder()
+
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
+
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+ errorNodeNotPermitted, false),
+
+ /* SCM FINALIZED */
+
+ // Old DN rejected
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSLayoutFeature.INITIAL_VERSION),
+ errorNodeNotPermitted, false),
+ // Pre-finalized DN accepted but instructed to finalize
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSVersion.SOFTWARE_VERSION),
+ success, true),
+ // Finalized DN accepted
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ defaultVersionProto(),
+ success, false),
+ // Newer DN rejected, even though its apparent version matches SCM.
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ LayoutVersionProto.newBuilder()
+
.setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+ errorNodeNotPermitted, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("scmDatanodeVersionCombinations")
+ public void testDatanodeFencingOnRegister(ComponentVersion scmApparent,
LayoutVersionProto dnVersionProto,
+ ErrorCode expectedResult, boolean expectFinalizeCmd) throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
+ when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
+ EventPublisher eventPublisher = mock(EventPublisher.class);
+ ScmVersionManager versionManager = mockVersionManager(scmApparent);
+
+ try (SCMNodeManager nodeManager = new SCMNodeManager(conf,
scmStorageConfig, eventPublisher,
+ new NetworkTopologyImpl(conf), SCMContext.emptyContext(),
versionManager)) {
+ DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails();
+ StorageReportProto storageReport = HddsTestUtils.createStorageReport(
+ node.getID(), node.getNetworkFullPath(), Long.MAX_VALUE);
+ RegisteredCommand cmd = nodeManager.register(node,
+
HddsTestUtils.createNodeReport(Collections.singletonList(storageReport),
emptyList()),
+ getRandomPipelineReports(), dnVersionProto);
+
+ assertEquals(expectedResult, cmd.getError());
+
+ if (expectedResult == success) {
+ ArgumentCaptor<CommandForDatanode> captor =
ArgumentCaptor.forClass(CommandForDatanode.class);
+ nodeManager.processVersionReport(node, dnVersionProto);
+ if (expectFinalizeCmd) {
+ verify(eventPublisher, times(1)).fireEvent(eq(DATANODE_COMMAND),
captor.capture());
+ assertEquals(node.getID(), captor.getValue().getDatanodeId());
+ assertEquals(finalizeNewLayoutVersionCommand,
captor.getValue().getCommand().getType());
+ } else {
+ verify(eventPublisher, times(0)).fireEvent(eq(DATANODE_COMMAND),
captor.capture());
+ }
+ }
+ }
+ }
+
@ParameterizedTest
@MethodSource("ineligibleHealthStates")
public void testDatanodeFinalizedCounterExcludesNonHealthyNodes(NodeStatus
expectedStatus)
@@ -829,10 +785,10 @@ public void
testDatanodeFinalizedCounterExcludesNonHealthyNodes(NodeStatus expec
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
// transitionNode stops heartbeating and will become STALE or DEAD
DatanodeDetails transitionNode =
- registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+ registerWithCapacity(nodeManager, defaultVersionProto(), success);
// heartbeatingNode keeps heartbeating as a healthy baseline
DatanodeDetails heartbeatingNode =
- registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+ registerWithCapacity(nodeManager, defaultVersionProto(), success);
nodeManager.processHeartbeat(transitionNode);
nodeManager.processHeartbeat(heartbeatingNode);
@@ -873,7 +829,7 @@ public void
testDatanodeFinalizedCounterIncludesAllHealthyOpStates(
throws IOException, AuthenticationException, NodeNotFoundException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
DatanodeDetails node =
- registerWithCapacity(nodeManager, MATCHING_VERSION_PROTO, success);
+ registerWithCapacity(nodeManager, defaultVersionProto(), success);
nodeManager.setNodeOperationalState(node, opState);
// All HEALTHY nodes should be counted regardless of operational state
@@ -884,81 +840,52 @@ public void
testDatanodeFinalizedCounterIncludesAllHealthyOpStates(
}
}
- // Currently invoked by testProcessLayoutVersion.
+ /**
+ * A datanode's software version can only increase with a restart, but it's
apparent version can change while it is
+ * running and registered to SCM. This is not supposed to happen until SCM's
apparent version has changed and it
+ * instructs a datanode to finalize, but this test covers a defensive check
that SCM will not send commands to a
+ * datanode that somehow ends up with a higher apparent version.
+ *
+ * There is currently no way for SCM to remove a Datanode from the cluster
after it has registed, so we depend
+ * on the datanode registration checks that run after each datanode restart
to catch invalid versions and fence
+ * datanodes.
+ */
+ @Test
public void testProcessVersionReportHigherApparentVersion()
throws IOException {
- final int healthCheckInterval = 200; // milliseconds
- final int heartbeatInterval = 1; // seconds
-
OzoneConfiguration conf = getConf();
- conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
- healthCheckInterval, MILLISECONDS);
- conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
- heartbeatInterval, SECONDS);
SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
EventPublisher eventPublisher = mock(EventPublisher.class);
- ScmVersionManager versionManager = mockVersionManager();
+ // SCM is pre-finalized: software version is current, apparent version is
older.
+ ScmVersionManager versionManager =
mockVersionManager(HDDSLayoutFeature.INITIAL_VERSION);
SCMContext nodeManagerContext = SCMContext.emptyContext();
- SCMNodeManager nodeManager = new SCMNodeManager(conf,
+ SCMNodeManager nodeManager = new SCMNodeManager(conf,
scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
nodeManagerContext, versionManager);
- // Datanodes should never have higher apparent version than SCM.
- DatanodeDetails node1 =
- HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
+ // Register a pre-finalized datanode to the pre-finalized SCM.
+ DatanodeDetails node1 = MockDatanodeDetails.randomDatanodeDetails();
+ StorageReportProto storageReport = HddsTestUtils.createStorageReport(
+ node1.getID(), node1.getNetworkFullPath(), Long.MAX_VALUE);
+ LayoutVersionProto preFinalizedDNVersion =
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSVersion.SOFTWARE_VERSION);
+ nodeManager.register(node1,
+
HddsTestUtils.createNodeReport(Collections.singletonList(storageReport),
emptyList()),
+ getRandomPipelineReports(),
+ preFinalizedDNVersion);
+
+ // DN somehow incorrectly finalizes before SCM. SCM should log an error
but it will not take action.
+ // There is currently no way for SCM to kick out a datanode after it has
registered.
LogCapturer logCapturer = LogCapturer.captureLogs(SCMNodeManager.class);
- nodeManager.processVersionReport(node1, LARGER_ALL_VERSIONS_PROTO);
- assertThat(logCapturer.getOutput()).contains("will not be allowed to join
the cluster");
+ nodeManager.processVersionReport(node1, defaultVersionProto());
+ assertThat(logCapturer.getOutput()).contains("SCM must finalize before
datanodes.");
+ verify(eventPublisher, times(0)).fireEvent(eq(DATANODE_COMMAND), any());
nodeManager.close();
}
- // Currently invoked by testProcessLayoutVersion.
- public void testProcessVersionReportLowerApparentVersion(boolean
withScmFinalized) {
- OzoneConfiguration conf = new OzoneConfiguration();
- SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
- when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
- EventPublisher eventPublisher = mock(EventPublisher.class);
-
- ScmVersionManager versionManager;
- if (withScmFinalized) {
- versionManager = mockVersionManager();
- } else {
- // Use an apparent version for SCM that is in between SCM's software
version and the datanode's apparent version.
- versionManager = mockVersionManager(HDDSLayoutFeature.SCM_HA);
- }
-
- SCMContext nodeManagerContext = SCMContext.emptyContext();
- SCMNodeManager nodeManager = new SCMNodeManager(conf,
- scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf),
- nodeManagerContext, versionManager);
- DatanodeDetails node1 =
- HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
- verify(eventPublisher,
- times(1)).fireEvent(NEW_NODE, node1);
- nodeManager.processVersionReport(node1,
- LayoutVersionProto.newBuilder()
-
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
-
.setSoftwareLayoutVersion(versionManager.getSoftwareVersion().serialize())
- .build());
- ArgumentCaptor<CommandForDatanode> captor =
- ArgumentCaptor.forClass(CommandForDatanode.class);
-
- // SCM will only tell datanodes to finalize after it has finalized.
- if (withScmFinalized) {
- verify(eventPublisher, times(1))
- .fireEvent(eq(DATANODE_COMMAND), captor.capture());
- assertEquals(captor.getValue().getDatanodeId(), node1.getID());
- assertEquals(finalizeNewLayoutVersionCommand,
- captor.getValue().getCommand().getType());
- } else {
- verify(eventPublisher, times(0))
- .fireEvent(eq(DATANODE_COMMAND), captor.capture());
- }
- }
-
@Test
public void testProcessCommandQueueReport()
throws IOException, NodeNotFoundException, AuthenticationException {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index 3c152f1dd15..eff92e27589 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -32,10 +32,14 @@
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.UniformDatanodesFactory;
+import
org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
import org.apache.hadoop.ozone.upgrade.RatisBasedVersionManager;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -86,6 +90,41 @@ public void shutdown() {
}
}
+ @Test
+ public void testFinalizedDatanodesShutDownWithPrefinalizedScm() throws
Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(SCMStorageConfig.TESTING_INIT_APPARENT_VERSION_KEY,
HDDSLayoutFeature.INITIAL_VERSION.serialize());
+
conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SERVER_RPC_FIRST_ELECTION_TIMEOUT,
"5s");
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, "1s");
+
+ MiniOzoneHAClusterImpl.Builder clusterBuilder =
MiniOzoneCluster.newHABuilder(conf);
+ clusterBuilder.setNumOfStorageContainerManagers(NUM_SCMS)
+ .setNumOfActiveSCMs(NUM_SCMS)
+ .setSCMServiceId("scmservice")
+ .setNumOfOzoneManagers(1)
+ .setNumDatanodes(NUM_DATANODES)
+ .setDatanodeFactory(UniformDatanodesFactory.newBuilder()
+ .setApparentVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+ .build());
+
+ // Prevent terminateDatanode() from calling System.exit(1) and killing the
test JVM.
+ ExitUtil.disableSystemExit();
+ LogCapturer logCapture =
LogCapturer.captureLogs(RegisterEndpointTask.class);
+ // This starts the mini ozone cluster.
+ cluster = clusterBuilder.build();
+
+ // isStopped cannot be set to true unless a datanode was started first.
+ // Each datanode should be rejected since its apparent version exceeds the
pre-finalized SCM's.
+ GenericTestUtils.waitFor(
+ () ->
cluster.getHddsDatanodes().stream().allMatch(HddsDatanodeService::isStopped),
+ 500, 30_000);
+
+ assertThat(logCapture.getOutput()).contains("SCM rejected this datanode's
registration");
+ for (StorageContainerManager scm :
cluster.getStorageContainerManagersList()) {
+ assertThat(scm.getScmNodeManager().getAllNodes()).isEmpty();
+ }
+ }
+
@Test
public void testFinalization() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index 0eacc90936b..143dcef2f0a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hdds.ComponentVersion;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -50,6 +51,7 @@
import org.apache.hadoop.hdds.scm.server.upgrade.ScmVersionManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.upgrade.HDDSVersionUtils;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -84,6 +86,7 @@ public class ReconNodeManager extends SCMNodeManager {
private Map<DatanodeID, Long> datanodeHeartbeatMap = new HashMap<>();
private final long reconDatanodeOutdatedTime;
+ private final ScmVersionManager versionManager;
public ReconNodeManager(OzoneConfiguration conf,
SCMStorageConfig scmStorageConfig,
@@ -93,6 +96,7 @@ public ReconNodeManager(OzoneConfiguration conf,
ScmVersionManager versionManager) {
super(conf, scmStorageConfig, eventPublisher, networkTopology,
SCMContext.emptyContext(), versionManager);
+ this.versionManager = versionManager;
final int reconStaleDatanodeMultiplier = 3;
this.reconDatanodeOutdatedTime = reconStaleDatanodeMultiplier *
HddsServerUtil.getReconHeartbeatInterval(conf);
@@ -294,8 +298,44 @@ public void removeNode(DatanodeDetails datanodeDetails)
throws NodeNotFoundExcep
@Override
protected void sendFinalizeToDatanodeIfNeeded(DatanodeDetails
datanodeDetails,
LayoutVersionProto versionReport) {
- // Recon will not send commands to datanodes, but it should still log if a
datanode with an invalid version is
- // heartbeating.
- shouldFenceDatanode(datanodeDetails, versionReport);
+ // Recon will not send finalize commands to datanodes.
+ }
+
+
+ /**
+ * Called by the parent class to when a datanode registers to determine
whether it should be allowed in the cluster.
+ * Since Recon finalizes automatically on startup, it should not reject
datanodes with lower software version after
+ * it has finalized, unlike SCM.
+ */
+ @Override
+ protected boolean shouldFenceDatanode(DatanodeDetails dnDetails,
LayoutVersionProto versionReport) {
+ ComponentVersion dnSoftwareVersion =
HDDSVersionUtils.deserializeHDDSVersionOrLayoutVersion(
+ versionReport.getSoftwareLayoutVersion());
+ ComponentVersion dnApparentVersion =
HDDSVersionUtils.deserializeHDDSVersionOrLayoutVersion(
+ versionReport.getMetadataLayoutVersion());
+ ComponentVersion reconSoftwareVersion =
versionManager.getSoftwareVersion();
+ ComponentVersion reconApparentVersion =
versionManager.getApparentVersion();
+
+ // DN software newer than Recon violates upgrade order. Recon must always
be upgraded first.
+ if (!dnSoftwareVersion.isSupportedBy(reconSoftwareVersion)) {
+ LOG.error("Datanode {} has software version {} which is newer than Recon
software version {}. " +
+ "Recon must be upgraded before datanodes.",
+ dnDetails, dnSoftwareVersion, reconSoftwareVersion);
+ return true;
+ }
+
+ // DN apparent version cannot be higher than Recon apparent version. Recon
must finalize first.
+ if (!versionManager.isAllowed(dnApparentVersion)) {
+ LOG.error("Datanode {} has apparent version {} which is higher than
Recon apparent version {}. " +
+ "Recon must finalize before datanodes.",
+ dnDetails, dnApparentVersion, reconApparentVersion);
+ return true;
+ }
+
+ // Else, either:
+ // DN software is older than Recon: expected since DNs are upgraded after
Recon.
+ // DN software matches Recon and DN apparent version <= Recon apparent
version: DN can register and SCM should
+ // instruct it to finalize if needed.
+ return false;
}
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
index c1315cee114..d8b761c501a 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
@@ -20,8 +20,13 @@
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static
org.apache.hadoop.hdds.scm.upgrade.ScmUpgradeTestUtils.mockVersionManager;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultVersionProto;
+import static
org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toVersionProto;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,18 +42,24 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.ComponentVersion;
+import org.apache.hadoop.hdds.HDDSVersion;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.upgrade.ScmVersionManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -62,6 +73,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Tests for Recon Node Manager.
@@ -237,6 +251,70 @@ public void testUpdateNodeOperationalStateFromScm() throws
Exception {
assertEquals(datanodeDetails.getUuid(), nodes.get(0).getUuid());
}
+ private static Stream<Arguments> reconDatanodeVersionCombinations() {
+ // Recon accepts old-software DNs even when finalized (unlike SCM), since
Recon finalizes on startup.
+ return Stream.of(
+ /* RECON PRE-FINALIZED */
+
+ // Old DN accepted
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSLayoutFeature.INITIAL_VERSION),
+ success),
+ // Pre-finalized DN accepted
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSVersion.SOFTWARE_VERSION),
+ success),
+ // Finalized DN rejected (apparent > Recon apparent)
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ defaultVersionProto(),
+ errorNodeNotPermitted),
+ // Newer SW DN rejected
+ Arguments.of(HDDSLayoutFeature.INITIAL_VERSION,
+ LayoutVersionProto.newBuilder()
+
.setMetadataLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.serialize())
+
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+ errorNodeNotPermitted),
+
+ /* RECON FINALIZED */
+
+ // Old DN accepted (Recon does not reject old-SW DNs after
finalization, unlike SCM)
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSLayoutFeature.INITIAL_VERSION),
+ success),
+ // Pre-finalized DN accepted
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ toVersionProto(HDDSLayoutFeature.INITIAL_VERSION,
HDDSVersion.SOFTWARE_VERSION),
+ success),
+ // Finalized DN accepted
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ defaultVersionProto(),
+ success),
+ // Newer SW DN rejected
+ Arguments.of(HDDSVersion.SOFTWARE_VERSION,
+ LayoutVersionProto.newBuilder()
+
.setMetadataLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize())
+
.setSoftwareLayoutVersion(HDDSVersion.SOFTWARE_VERSION.serialize() + 1).build(),
+ errorNodeNotPermitted)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("reconDatanodeVersionCombinations")
+ public void testDatanodeFencingOnRegister(ComponentVersion reconApparent,
+ LayoutVersionProto dnVersionProto, ErrorCode expectedResult) throws
IOException {
+ ReconStorageConfig scmStorageConfig = new ReconStorageConfig(conf, new
ReconUtils());
+ EventQueue eventQueue = new EventQueue();
+ NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
+ final Table<DatanodeID, DatanodeDetails> nodeTable =
ReconSCMDBDefinition.NODES.getTable(store);
+ ScmVersionManager reconVersionManager = mockVersionManager(reconApparent);
+ try (ReconNodeManager reconNodeManager = new ReconNodeManager(conf,
+ scmStorageConfig, eventQueue, clusterMap, nodeTable,
reconVersionManager, reconContext)) {
+ DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ RegisteredCommand cmd = reconNodeManager.register(datanodeDetails, null,
null, dnVersionProto);
+ assertEquals(expectedResult, cmd.getError());
+ }
+ }
+
@Test
public void testDatanodeUpdate() throws IOException {
ReconStorageConfig scmStorageConfig =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]