This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 397f62f9cf HDDS-9645. Recon should exclude out-of-service nodes when
checking for healthy containers (#5651)
397f62f9cf is described below
commit 397f62f9cf9832f7316c3cfc49110422e2bfb849
Author: Christos Bisias <[email protected]>
AuthorDate: Sat Jan 27 18:39:13 2024 +0200
HDDS-9645. Recon should exclude out-of-service nodes when checking for
healthy containers (#5651)
---
.../scm/node/TestDecommissionAndMaintenance.java | 121 ++---
.../apache/hadoop/hdds/scm/node/TestNodeUtil.java | 102 +++++
.../ozone/recon/TestReconAndAdminContainerCLI.java | 485 +++++++++++++++++++++
.../hadoop/ozone/recon/TestReconEndpointUtil.java | 185 ++++++++
.../api/types/UnhealthyContainerMetadata.java | 4 +
.../api/types/UnhealthyContainersResponse.java | 4 +
.../ozone/recon/fsck/ContainerHealthStatus.java | 77 +++-
.../ozone/recon/fsck/ContainerHealthTask.java | 31 +-
.../ozone/recon/persistence/ContainerHistory.java | 10 +-
.../scm/ReconStorageContainerManagerFacade.java | 2 +-
.../recon/fsck/TestContainerHealthStatus.java | 221 +++++++++-
.../ozone/recon/fsck/TestContainerHealthTask.java | 10 +-
.../TestContainerHealthTaskRecordGenerator.java | 39 +-
13 files changed, 1138 insertions(+), 153 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestDecommissionAndMaintenance.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestDecommissionAndMaintenance.java
index 86aec28519..5ebf9b56a8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestDecommissionAndMaintenance.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestDecommissionAndMaintenance.java
@@ -64,6 +64,10 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -209,7 +213,7 @@ public class TestDecommissionAndMaintenance {
scmClient.decommissionNodes(Arrays.asList(
getDNHostAndPort(toDecommission)));
- waitForDnToReachOpState(toDecommission, DECOMMISSIONED);
+ waitForDnToReachOpState(nm, toDecommission, DECOMMISSIONED);
// Ensure one node transitioned to DECOMMISSIONING
List<DatanodeDetails> decomNodes = nm.getNodes(
DECOMMISSIONED,
@@ -225,7 +229,7 @@ public class TestDecommissionAndMaintenance {
// Stop the decommissioned DN
int dnIndex = cluster.getHddsDatanodeIndex(toDecommission);
cluster.shutdownHddsDatanode(toDecommission);
- waitForDnToReachHealthState(toDecommission, DEAD);
+ waitForDnToReachHealthState(nm, toDecommission, DEAD);
// Now the decommissioned node is dead, we should have
// 3 replicas for the tracked container.
@@ -236,7 +240,7 @@ public class TestDecommissionAndMaintenance {
cluster.restartHddsDatanode(dnIndex, true);
scmClient.recommissionNodes(Arrays.asList(
getDNHostAndPort(toDecommission)));
- waitForDnToReachOpState(toDecommission, IN_SERVICE);
+ waitForDnToReachOpState(nm, toDecommission, IN_SERVICE);
waitForDnToReachPersistedOpState(toDecommission, IN_SERVICE);
}
@@ -272,7 +276,7 @@ public class TestDecommissionAndMaintenance {
// After the SCM restart, the DN should report as DECOMMISSIONING, then
// it should re-enter the decommission workflow and move to DECOMMISSIONED
DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
- waitForDnToReachOpState(newDn, DECOMMISSIONED);
+ waitForDnToReachOpState(nm, newDn, DECOMMISSIONED);
waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
// Now the node is decommissioned, so restart SCM again
@@ -282,7 +286,7 @@ public class TestDecommissionAndMaintenance {
// On initial registration, the DN should report its operational state
// and if it is decommissioned, that should be updated in the NodeStatus
- waitForDnToReachOpState(newDn, DECOMMISSIONED);
+ waitForDnToReachOpState(nm, newDn, DECOMMISSIONED);
// Also confirm the datanodeDetails correctly reflect the operational
// state.
waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
@@ -291,7 +295,7 @@ public class TestDecommissionAndMaintenance {
// reflect the state of in SCM, in IN_SERVICE.
int dnIndex = cluster.getHddsDatanodeIndex(dn);
cluster.shutdownHddsDatanode(dnIndex);
- waitForDnToReachHealthState(dn, DEAD);
+ waitForDnToReachHealthState(nm, dn, DEAD);
// Datanode is shutdown and dead. Now recommission it in SCM
scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
// Now restart it and ensure it remains IN_SERVICE
@@ -301,8 +305,8 @@ public class TestDecommissionAndMaintenance {
// As this is not an initial registration since SCM was started, the DN
// should report its operational state and if it differs from what SCM
// has, then the SCM state should be used and the DN state updated.
- waitForDnToReachHealthState(newDn, HEALTHY);
- waitForDnToReachOpState(newDn, IN_SERVICE);
+ waitForDnToReachHealthState(nm, newDn, HEALTHY);
+ waitForDnToReachOpState(nm, newDn, IN_SERVICE);
waitForDnToReachPersistedOpState(newDn, IN_SERVICE);
}
@@ -342,7 +346,7 @@ public class TestDecommissionAndMaintenance {
scmClient.startMaintenanceNodes(Arrays.asList(
getDNHostAndPort(dn)), 0);
- waitForDnToReachOpState(dn, IN_MAINTENANCE);
+ waitForDnToReachOpState(nm, dn, IN_MAINTENANCE);
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
// Should still be 3 replicas online as no replication should happen for
@@ -356,7 +360,7 @@ public class TestDecommissionAndMaintenance {
// Stop the maintenance DN
cluster.shutdownHddsDatanode(dn);
- waitForDnToReachHealthState(dn, DEAD);
+ waitForDnToReachHealthState(nm, dn, DEAD);
// Now the maintenance node is dead, we should still have
// 3 replicas as we don't purge the replicas for a dead maintenance node
@@ -368,13 +372,13 @@ public class TestDecommissionAndMaintenance {
// Restart the DN and it should keep the IN_MAINTENANCE state
cluster.restartHddsDatanode(dn, true);
DatanodeDetails newDN = nm.getNodeByUuid(dn.getUuid().toString());
- waitForDnToReachHealthState(newDN, HEALTHY);
+ waitForDnToReachHealthState(nm, newDN, HEALTHY);
waitForDnToReachPersistedOpState(newDN, IN_MAINTENANCE);
// Stop the DN and wait for it to go dead.
int dnIndex = cluster.getHddsDatanodeIndex(dn);
cluster.shutdownHddsDatanode(dnIndex);
- waitForDnToReachHealthState(dn, DEAD);
+ waitForDnToReachHealthState(nm, dn, DEAD);
// Datanode is shutdown and dead. Now recommission it in SCM
scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
@@ -386,8 +390,8 @@ public class TestDecommissionAndMaintenance {
// As this is not an initial registration since SCM was started, the DN
// should report its operational state and if it differs from what SCM
// has, then the SCM state should be used and the DN state updated.
- waitForDnToReachHealthState(newDn, HEALTHY);
- waitForDnToReachOpState(newDn, IN_SERVICE);
+ waitForDnToReachHealthState(nm, newDn, HEALTHY);
+ waitForDnToReachOpState(nm, newDn, IN_SERVICE);
waitForDnToReachPersistedOpState(dn, IN_SERVICE);
}
@@ -410,7 +414,7 @@ public class TestDecommissionAndMaintenance {
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
scmClient.startMaintenanceNodes(forMaintenance.stream()
- .map(this::getDNHostAndPort)
+ .map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);
// Ensure all 3 DNs go to maintenance
@@ -429,7 +433,7 @@ public class TestDecommissionAndMaintenance {
// Ensure all 3 DNs go to maintenance
for (DatanodeDetails dn : forMaintenance) {
- waitForDnToReachOpState(dn, IN_SERVICE);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
}
waitForContainerReplicas(container, 3);
@@ -444,18 +448,18 @@ public class TestDecommissionAndMaintenance {
.limit(2)
.collect(Collectors.toList());
scmClient.startMaintenanceNodes(ecMaintenance.stream()
- .map(this::getDNHostAndPort)
+ .map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);
for (DatanodeDetails dn : ecMaintenance) {
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
}
assertThat(cm.getContainerReplicas(ecContainer.containerID()).size()).isGreaterThanOrEqualTo(6);
scmClient.recommissionNodes(ecMaintenance.stream()
- .map(this::getDNHostAndPort)
+ .map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()));
// Ensure the 2 DNs go to IN_SERVICE
for (DatanodeDetails dn : ecMaintenance) {
- waitForDnToReachOpState(dn, IN_SERVICE);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
}
waitForContainerReplicas(ecContainer, 5);
}
@@ -478,7 +482,7 @@ public class TestDecommissionAndMaintenance {
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));
scmClient.startMaintenanceNodes(forMaintenance.stream()
- .map(this::getDNHostAndPort)
+ .map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);
// Ensure all 3 DNs go to entering_maintenance
@@ -495,7 +499,7 @@ public class TestDecommissionAndMaintenance {
// Ensure all 3 DNs go to maintenance
for (DatanodeDetails dn : newDns) {
- waitForDnToReachOpState(dn, IN_MAINTENANCE);
+ waitForDnToReachOpState(nm, dn, IN_MAINTENANCE);
}
// There should now be 5-6 replicas of the container we are tracking
@@ -525,7 +529,7 @@ public class TestDecommissionAndMaintenance {
// decommission interface only allows us to specify hours from now as the
// end time, that is not really suitable for a test like this.
nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime);
- waitForDnToReachOpState(dn, IN_SERVICE);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
waitForDnToReachPersistedOpState(dn, IN_SERVICE);
// Put the node back into maintenance and then stop it and wait for it to
@@ -533,11 +537,11 @@ public class TestDecommissionAndMaintenance {
scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0);
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
cluster.shutdownHddsDatanode(dn);
- waitForDnToReachHealthState(dn, DEAD);
+ waitForDnToReachHealthState(nm, dn, DEAD);
newEndTime = System.currentTimeMillis() / 1000 + 5;
nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime);
- waitForDnToReachOpState(dn, IN_SERVICE);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
// Ensure there are 3 replicas not including the dead node, indicating a
new
// replica was created
GenericTestUtils.waitFor(() -> getContainerReplicas(container)
@@ -584,7 +588,7 @@ public class TestDecommissionAndMaintenance {
// Now let the node go dead and repeat the test. This time ensure a new
// replica is created.
cluster.shutdownHddsDatanode(dn);
- waitForDnToReachHealthState(dn, DEAD);
+ waitForDnToReachHealthState(nm, dn, DEAD);
cluster.restartStorageContainerManager(false);
setManagers();
@@ -631,18 +635,6 @@ public class TestDecommissionAndMaintenance {
}
}
- /**
- * Retrieves the NodeStatus for the given DN or fails the test if the
- * Node cannot be found. This is a helper method to allow the nodeStatus to
be
- * checked in lambda expressions.
- * @param dn Datanode for which to retrieve the NodeStatus.
- * @return
- */
- private NodeStatus getNodeStatus(DatanodeDetails dn) {
- return assertDoesNotThrow(() -> nm.getNodeStatus(dn),
- "Unexpected exception getting the nodeState");
- }
-
/**
* Retrieves the containerReplica set for a given container or fails the test
* if the container cannot be found. This is a helper method to allow the
@@ -668,61 +660,6 @@ public class TestDecommissionAndMaintenance {
return c.getDatanodeDetails();
}
- /**
- * Given a Datanode, return a string consisting of the hostname and one of
its
- * ports in the for host:post.
- * @param dn Datanode for which to retrieve the host:post string
- * @return host:port for the given DN.
- */
- private String getDNHostAndPort(DatanodeDetails dn) {
- return dn.getHostName() + ":" + dn.getPorts().get(0).getValue();
- }
-
- /**
- * Wait for the given datanode to reach the given operational state.
- * @param dn Datanode for which to check the state
- * @param state The state to wait for.
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForDnToReachOpState(DatanodeDetails dn,
- HddsProtos.NodeOperationalState state)
- throws TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(
- () -> getNodeStatus(dn).getOperationalState().equals(state),
- 200, 30000);
- }
-
- /**
- * Wait for the given datanode to reach the given Health state.
- * @param dn Datanode for which to check the state
- * @param state The state to wait for.
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForDnToReachHealthState(DatanodeDetails dn,
- HddsProtos.NodeState state)
- throws TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(
- () -> getNodeStatus(dn).getHealth().equals(state),
- 200, 30000);
- }
-
- /**
- * Wait for the given datanode to reach the given persisted state.
- * @param dn Datanode for which to check the state
- * @param state The state to wait for.
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForDnToReachPersistedOpState(DatanodeDetails dn,
- HddsProtos.NodeOperationalState state)
- throws TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(
- () -> dn.getPersistedOpState().equals(state),
- 200, 30000);
- }
-
/**
* Get any container present in the cluster and wait to ensure 3 replicas
* have been reported before returning the container.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeUtil.java
new file mode 100644
index 0000000000..1cb5ef792f
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeUtil.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility class with helper methods for testing node state and status.
+ */
+public final class TestNodeUtil {
+
+ private TestNodeUtil() {
+ }
+
+ /**
+ * Wait for the given datanode to reach the given operational state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public static void waitForDnToReachOpState(NodeManager nodeManager,
+ DatanodeDetails dn, HddsProtos.NodeOperationalState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> getNodeStatus(nodeManager, dn)
+ .getOperationalState().equals(state),
+ 200, 30000);
+ }
+
+ /**
+ * Wait for the given datanode to reach the given Health state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public static void waitForDnToReachHealthState(NodeManager nodeManager,
+ DatanodeDetails dn, HddsProtos.NodeState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> getNodeStatus(nodeManager, dn).getHealth().equals(state),
+ 200, 30000);
+ }
+
+ /**
+ * Retrieves the NodeStatus for the given DN or fails the test if the
+ * Node cannot be found. This is a helper method to allow the nodeStatus to
be
+ * checked in lambda expressions.
+ * @param dn Datanode for which to retrieve the NodeStatus.
+ */
+ public static NodeStatus getNodeStatus(NodeManager nodeManager,
+ DatanodeDetails dn) {
+ return Assertions.assertDoesNotThrow(
+ () -> nodeManager.getNodeStatus(dn),
+ "Unexpected exception getting the nodeState");
+ }
+
+ /**
+ * Given a Datanode, return a string consisting of the hostname and one of
its
+ * ports in the for host:post.
+ * @param dn Datanode for which to retrieve the host:post string
+ * @return host:port for the given DN.
+ */
+ public static String getDNHostAndPort(DatanodeDetails dn) {
+ return dn.getHostName() + ":" + dn.getPorts().get(0).getValue();
+ }
+
+ /**
+ * Wait for the given datanode to reach the given persisted state.
+ * @param dn Datanode for which to check the state
+ * @param state The state to wait for.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public static void waitForDnToReachPersistedOpState(DatanodeDetails dn,
+ HddsProtos.NodeOperationalState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> dn.getPersistedOpState().equals(state),
+ 200, 30000);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java
new file mode 100644
index 0000000000..8429269b24
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.recon;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmUtils;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import
org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata;
+import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
+import org.apache.hadoop.hdds.scm.node.TestNodeUtil;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.LambdaTestUtils;
+import
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptyMap;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
+
+/**
+ * Integration tests for ensuring Recon's consistency
+ * with the "ozone admin container" CLI.
+ */
+@Timeout(300)
+public class TestReconAndAdminContainerCLI {
+
+ private static final OzoneConfiguration CONF = new OzoneConfiguration();
+ private static ScmClient scmClient;
+ private static MiniOzoneCluster cluster;
+ private static NodeManager scmNodeManager;
+ private static long containerIdR3;
+ private static OzoneBucket ozoneBucket;
+ private static ContainerManager scmContainerManager;
+ private static ContainerManager reconContainerManager;
+
+ private static Stream<Arguments> outOfServiceNodeStateArgs() {
+ return Stream.of(
+ Arguments.of(NodeOperationalState.ENTERING_MAINTENANCE,
+ NodeOperationalState.IN_MAINTENANCE, true),
+ Arguments.of(NodeOperationalState.DECOMMISSIONING,
+ NodeOperationalState.DECOMMISSIONED, false)
+ );
+ }
+
+ @BeforeAll
+ public static void init() throws Exception {
+ setupConfigKeys();
+ cluster = MiniOzoneCluster.newBuilder(CONF)
+ .setNumDatanodes(5)
+ .includeRecon(true)
+ .build();
+ cluster.waitForClusterToBeReady();
+ GenericTestUtils.setLogLevel(ReconNodeManager.LOG, Level.DEBUG);
+
+ scmClient = new ContainerOperationClient(CONF);
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ PipelineManager scmPipelineManager = scm.getPipelineManager();
+ scmContainerManager = scm.getContainerManager();
+ scmNodeManager = scm.getScmNodeManager();
+
+ ReconStorageContainerManagerFacade reconScm =
+ (ReconStorageContainerManagerFacade)
+ cluster.getReconServer().getReconStorageContainerManager();
+ PipelineManager reconPipelineManager = reconScm.getPipelineManager();
+ reconContainerManager = reconScm.getContainerManager();
+
+ LambdaTestUtils.await(60000, 5000,
+ () -> (reconPipelineManager.getPipelines().size() >= 4));
+
+ // Verify that Recon has all the pipelines from SCM.
+ scmPipelineManager.getPipelines().forEach(p -> {
+ try {
+ Assertions.assertNotNull(reconPipelineManager.getPipeline(p.getId()));
+ } catch (PipelineNotFoundException e) {
+ Assertions.fail();
+ }
+ });
+
+ Assertions.assertTrue(scmContainerManager.getContainers().isEmpty());
+
+ // Verify that all nodes are registered with Recon.
+ NodeManager reconNodeManager = reconScm.getScmNodeManager();
+ Assertions.assertEquals(scmNodeManager.getAllNodes().size(),
+ reconNodeManager.getAllNodes().size());
+
+ OzoneClient client = cluster.newClient();
+ String volumeName = "vol1";
+ String bucketName = "bucket1";
+
+ ozoneBucket = TestDataUtil.createVolumeAndBucket(
+ client, volumeName, bucketName, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ String keyNameR3 = "key1";
+ containerIdR3 = setupRatisKey(keyNameR3,
+ HddsProtos.ReplicationFactor.THREE);
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * It's the same regardless of the ReplicationConfig,
+ * but it's easier to test with Ratis ONE.
+ */
+ @Test
+ public void testMissingContainer() throws Exception {
+ String keyNameR1 = "key2";
+ long containerID = setupRatisKey(keyNameR1,
+ HddsProtos.ReplicationFactor.ONE);
+
+ Pipeline pipeline =
+ scmClient.getContainerWithPipeline(containerID).getPipeline();
+
+ for (DatanodeDetails details : pipeline.getNodes()) {
+ cluster.shutdownHddsDatanode(details);
+ }
+ TestHelper.waitForReplicaCount(containerID, 0, cluster);
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return scmClient.getReplicationManagerReport()
+ .getStat(ReplicationManagerReport.HealthState.MISSING) == 1;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 20000);
+
+ UnHealthyContainerStates containerStateForTesting =
+ UnHealthyContainerStates.MISSING;
+ compareRMReportToReconResponse(containerStateForTesting.toString());
+
+ for (DatanodeDetails details : pipeline.getNodes()) {
+ cluster.restartHddsDatanode(details, false);
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager, details,
IN_SERVICE);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("outOfServiceNodeStateArgs")
+ public void testNodesInDecommissionOrMaintenance(
+ NodeOperationalState initialState, NodeOperationalState finalState,
+ boolean isMaintenance) throws Exception {
+ Pipeline pipeline =
+ scmClient.getContainerWithPipeline(containerIdR3).getPipeline();
+
+ List<DatanodeDetails> details =
+ pipeline.getNodes().stream()
+ .filter(d -> d.getPersistedOpState().equals(IN_SERVICE))
+ .collect(Collectors.toList());
+
+ final DatanodeDetails nodeToGoOffline1 = details.get(0);
+ final DatanodeDetails nodeToGoOffline2 = details.get(1);
+
+ UnHealthyContainerStates underReplicatedState =
+ UnHealthyContainerStates.UNDER_REPLICATED;
+ UnHealthyContainerStates overReplicatedState =
+ UnHealthyContainerStates.OVER_REPLICATED;
+
+ // First node goes offline.
+ if (isMaintenance) {
+ scmClient.startMaintenanceNodes(Collections.singletonList(
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline1)), 0);
+ } else {
+ scmClient.decommissionNodes(Collections.singletonList(
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline1)));
+ }
+
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline1, initialState);
+
+ compareRMReportToReconResponse(underReplicatedState.toString());
+ compareRMReportToReconResponse(overReplicatedState.toString());
+
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline1, finalState);
+ // Every time a node goes into decommission,
+ // a new replica-copy is made to another node.
+ // For maintenance, there is no replica-copy in this case.
+ if (!isMaintenance) {
+ TestHelper.waitForReplicaCount(containerIdR3, 4, cluster);
+ }
+
+ compareRMReportToReconResponse(underReplicatedState.toString());
+ compareRMReportToReconResponse(overReplicatedState.toString());
+
+ // Second node goes offline.
+ if (isMaintenance) {
+ scmClient.startMaintenanceNodes(Collections.singletonList(
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline2)), 0);
+ } else {
+ scmClient.decommissionNodes(Collections.singletonList(
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline2)));
+ }
+
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline2, initialState);
+
+ compareRMReportToReconResponse(underReplicatedState.toString());
+ compareRMReportToReconResponse(overReplicatedState.toString());
+
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline2, finalState);
+
+ // There will be a replica copy for both maintenance and decommission.
+ // maintenance 3 -> 4, decommission 4 -> 5.
+ int expectedReplicaNum = isMaintenance ? 4 : 5;
+ TestHelper.waitForReplicaCount(containerIdR3, expectedReplicaNum, cluster);
+
+ compareRMReportToReconResponse(underReplicatedState.toString());
+ compareRMReportToReconResponse(overReplicatedState.toString());
+
+ scmClient.recommissionNodes(Arrays.asList(
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline1),
+ TestNodeUtil.getDNHostAndPort(nodeToGoOffline2)));
+
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline1, IN_SERVICE);
+ TestNodeUtil.waitForDnToReachOpState(scmNodeManager,
+ nodeToGoOffline2, IN_SERVICE);
+
+ TestNodeUtil.waitForDnToReachPersistedOpState(nodeToGoOffline1,
IN_SERVICE);
+ TestNodeUtil.waitForDnToReachPersistedOpState(nodeToGoOffline2,
IN_SERVICE);
+
+ compareRMReportToReconResponse(underReplicatedState.toString());
+ compareRMReportToReconResponse(overReplicatedState.toString());
+ }
+
+ /**
+ * The purpose of this method, isn't to validate the numbers
+ * but to make sure that they are consistent between
+ * Recon and the ReplicationManager.
+ */
+ private static void compareRMReportToReconResponse(String containerState)
+ throws Exception {
+ Assertions.assertFalse(Strings.isNullOrEmpty(containerState));
+
+ ReplicationManagerReport rmReport =
scmClient.getReplicationManagerReport();
+ UnhealthyContainersResponse reconResponse =
+ TestReconEndpointUtil
+ .getUnhealthyContainersFromRecon(CONF, containerState);
+
+ long rmMissingCounter = rmReport.getStat(
+ ReplicationManagerReport.HealthState.MISSING);
+ long rmUnderReplCounter = rmReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED);
+ long rmOverReplCounter = rmReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED);
+ long rmMisReplCounter = rmReport.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED);
+
+ // Both threads are running every 1 second.
+ // Wait until all values are equal.
+ GenericTestUtils.waitFor(
+ () -> rmMissingCounter == reconResponse.getMissingCount() &&
+ rmUnderReplCounter ==
reconResponse.getUnderReplicatedCount() &&
+ rmOverReplCounter ==
reconResponse.getOverReplicatedCount() &&
+ rmMisReplCounter == reconResponse.getMisReplicatedCount(),
+ 1000, 40000);
+
+ // Recon's UnhealthyContainerResponse contains a list of containers
+ // for a particular state. Check if RMs sample of containers can be
+ // found in Recon's list of containers for a particular state.
+ HealthState rmState = HealthState.UNHEALTHY;
+
+ if (UnHealthyContainerStates.valueOf(containerState)
+ .equals(UnHealthyContainerStates.MISSING) &&
+ rmMissingCounter > 0) {
+ rmState = HealthState.MISSING;
+ } else if (UnHealthyContainerStates.valueOf(containerState)
+ .equals(UnHealthyContainerStates.UNDER_REPLICATED) &&
+ rmUnderReplCounter > 0) {
+ rmState = HealthState.UNDER_REPLICATED;
+ } else if (UnHealthyContainerStates.valueOf(containerState)
+ .equals(UnHealthyContainerStates.OVER_REPLICATED) &&
+ rmOverReplCounter > 0) {
+ rmState = HealthState.OVER_REPLICATED;
+ } else if (UnHealthyContainerStates.valueOf(containerState)
+ .equals(UnHealthyContainerStates.MIS_REPLICATED) &&
+ rmMisReplCounter > 0) {
+ rmState = HealthState.MIS_REPLICATED;
+ }
+
+ List<ContainerID> rmContainerIDs = rmReport.getSample(rmState);
+ List<Long> rmIDsToLong = new ArrayList<>();
+ for (ContainerID id : rmContainerIDs) {
+ rmIDsToLong.add(id.getId());
+ }
+ List<Long> reconContainerIDs =
+ reconResponse.getContainers()
+ .stream()
+ .map(UnhealthyContainerMetadata::getContainerID)
+ .collect(Collectors.toList());
+ Assertions.assertTrue(reconContainerIDs.containsAll(rmIDsToLong));
+ }
+
+ private static long setupRatisKey(String keyName,
+ HddsProtos.ReplicationFactor replicationFactor) throws Exception {
+ OmKeyInfo omKeyInfo = createTestKey(keyName,
+ RatisReplicationConfig.getInstance(replicationFactor));
+
+ // Sync Recon with OM, to force it to get the new key entries.
+ TestReconEndpointUtil.triggerReconDbSyncWithOm(CONF);
+
+ List<Long> containerIDs = getContainerIdsForKey(omKeyInfo);
+ // The list has only 1 containerID.
+ Assertions.assertEquals(1, containerIDs.size());
+ long containerID = containerIDs.get(0);
+
+ // Verify Recon picked up the new container.
+ Assertions.assertEquals(scmContainerManager.getContainers(),
+ reconContainerManager.getContainers());
+
+ ReconContainerMetadataManager reconContainerMetadataManager =
+ cluster.getReconServer().getReconContainerMetadataManager();
+
+ // Verify Recon picked up the new keys and
+ // updated its container key mappings.
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return reconContainerMetadataManager
+ .getKeyCountForContainer(containerID) > 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 20000);
+
+ return containerID;
+ }
+
+ private static OmKeyInfo createTestKey(String keyName,
+ ReplicationConfig replicationConfig)
+ throws IOException {
+ byte[] textBytes = "Testing".getBytes(UTF_8);
+ try (OutputStream out = ozoneBucket.createKey(keyName,
+ textBytes.length, replicationConfig, emptyMap())) {
+ out.write(textBytes);
+ }
+
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(ozoneBucket.getVolumeName())
+ .setBucketName(ozoneBucket.getName())
+ .setKeyName(keyName)
+ .build();
+ return cluster.getOzoneManager().lookupKey(keyArgs);
+ }
+
+ private static List<Long> getContainerIdsForKey(OmKeyInfo omKeyInfo) {
+ Assertions.assertNotNull(omKeyInfo.getLatestVersionLocations());
+ List<OmKeyLocationInfo> locations =
+ omKeyInfo.getLatestVersionLocations().getLocationList();
+
+ List<Long> ids = new ArrayList<>();
+ for (OmKeyLocationInfo location : locations) {
+ ids.add(location.getContainerID());
+ }
+ return ids;
+ }
+
+ private static void setupConfigKeys() {
+ CONF.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+ 100, TimeUnit.MILLISECONDS);
+ CONF.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+ CONF.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+ CONF.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS);
+ CONF.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 1, SECONDS);
+ CONF.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS);
+ CONF.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
+ CONF.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+ CONF.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ CONF.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+ 1, SECONDS);
+ CONF.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL,
+ 1, SECONDS);
+
CONF.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ 0, SECONDS);
+ CONF.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s");
+ CONF.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s");
+ CONF.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s");
+
+ CONF.setTimeDuration(HDDS_RECON_HEARTBEAT_INTERVAL,
+ 1, TimeUnit.SECONDS);
+ CONF.setTimeDuration(OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ 1, TimeUnit.SECONDS);
+
+ CONF.set(ScmUtils.getContainerReportConfPrefix() +
+ ".queue.wait.threshold", "1");
+ CONF.set(ScmUtils.getContainerReportConfPrefix() +
+ ".execute.wait.threshold", "1");
+
+ ReconTaskConfig reconTaskConfig = CONF.getObject(ReconTaskConfig.class);
+ reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(1));
+ CONF.setFromObject(reconTaskConfig);
+
+ ReplicationManager.ReplicationManagerConfiguration replicationConf =
+ CONF.getObject(ReplicationManager
+ .ReplicationManagerConfiguration.class);
+ replicationConf.setInterval(Duration.ofSeconds(1));
+ replicationConf.setUnderReplicatedInterval(Duration.ofSeconds(1));
+ replicationConf.setOverReplicatedInterval(Duration.ofSeconds(1));
+ CONF.setFromObject(replicationConf);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconEndpointUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconEndpointUtil.java
new file mode 100644
index 0000000000..5f5f8b2351
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconEndpointUtil.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.recon;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_DEFAULT;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_HTTPS_ADDRESS_DEFAULT;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_HTTPS_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_HTTP_ADDRESS_DEFAULT;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.server.http.HttpConfig.getHttpPolicy;
+import static org.apache.hadoop.http.HttpServer2.HTTPS_SCHEME;
+import static org.apache.hadoop.http.HttpServer2.HTTP_SCHEME;
+
+/**
+ * Utility class, used by integration tests,
+ * for getting responses from Recon Endpoints.
+ */
+public final class TestReconEndpointUtil {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReconEndpointUtil.class);
+
+ private static final String CONTAINER_ENDPOINT = "/api/v1/containers";
+ private static final String OM_DB_SYNC_ENDPOINT = "/api/v1/triggerdbsync/om";
+
+ private TestReconEndpointUtil() {
+ }
+
+ public static void triggerReconDbSyncWithOm(
+ OzoneConfiguration conf) {
+ StringBuilder urlBuilder = new StringBuilder();
+ urlBuilder.append(getReconWebAddress(conf))
+ .append(OM_DB_SYNC_ENDPOINT);
+
+ String response = "";
+ try {
+ response = makeHttpCall(conf, urlBuilder);
+ } catch (Exception e) {
+ LOG.error("Error getting db sync response from Recon");
+ }
+
+ if (!Strings.isNullOrEmpty(response) &&
+ !response.equals("true")) {
+ LOG.error("Triggering Recon DB sync with OM failed.");
+ }
+ }
+
+ public static UnhealthyContainersResponse getUnhealthyContainersFromRecon(
+ OzoneConfiguration conf, String containerState)
+ throws JsonProcessingException {
+ StringBuilder urlBuilder = new StringBuilder();
+ urlBuilder.append(getReconWebAddress(conf))
+ .append(CONTAINER_ENDPOINT)
+ .append("/unhealthy/")
+ .append(containerState);
+
+ String containersResponse = "";
+ try {
+ containersResponse = makeHttpCall(conf, urlBuilder);
+ } catch (Exception e) {
+ LOG.error("Error getting unhealthy containers response from Recon");
+ }
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
+ return objectMapper.readValue(containersResponse,
+ UnhealthyContainersResponse.class);
+ }
+
+ public static String makeHttpCall(OzoneConfiguration conf, StringBuilder url)
+ throws Exception {
+
+ System.out.println("Connecting to Recon: " + url + " ...");
+ final URLConnectionFactory connectionFactory =
+ URLConnectionFactory.newDefaultURLConnectionFactory(conf);
+
+ boolean isSpnegoEnabled = isHTTPSEnabled(conf);
+ HttpURLConnection httpURLConnection;
+
+ try {
+ httpURLConnection = (HttpURLConnection) connectionFactory.openConnection(
+ new URL(url.toString()), isSpnegoEnabled);
+ httpURLConnection.connect();
+ int errorCode = httpURLConnection.getResponseCode();
+ InputStream inputStream = httpURLConnection.getInputStream();
+
+ if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
+ return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+ }
+
+ if (httpURLConnection.getErrorStream() != null) {
+ System.out.println("Recon is being initialized. " +
+ "Please wait a moment");
+ return null;
+ } else {
+ System.out.println("Unexpected null in http payload," +
+ " while processing request");
+ }
+ return null;
+ } catch (ConnectException ex) {
+ System.err.println("Connection Refused. Please make sure the " +
+ "Recon Server has been started.");
+ return null;
+ }
+ }
+
+ public static String getReconWebAddress(OzoneConfiguration conf) {
+ final String protocol;
+ final HttpConfig.Policy webPolicy = getHttpPolicy(conf);
+
+ final boolean isHostDefault;
+ String host;
+
+ if (webPolicy.isHttpsEnabled()) {
+ protocol = HTTPS_SCHEME;
+ host = conf.get(OZONE_RECON_HTTPS_ADDRESS_KEY,
+ OZONE_RECON_HTTPS_ADDRESS_DEFAULT);
+ isHostDefault = getHostOnly(host).equals(
+ getHostOnly(OZONE_RECON_HTTPS_ADDRESS_DEFAULT));
+ } else {
+ protocol = HTTP_SCHEME;
+ host = conf.get(OZONE_RECON_HTTP_ADDRESS_KEY,
+ OZONE_RECON_HTTP_ADDRESS_DEFAULT);
+ isHostDefault = getHostOnly(host).equals(
+ getHostOnly(OZONE_RECON_HTTP_ADDRESS_DEFAULT));
+ }
+
+ if (isHostDefault) {
+ // Fallback to <Recon RPC host name>:<Recon http(s) address port>
+ final String rpcHost =
+ conf.get(OZONE_RECON_ADDRESS_KEY, OZONE_RECON_ADDRESS_DEFAULT);
+ host = getHostOnly(rpcHost) + ":" + getPort(host);
+ }
+
+ return protocol + "://" + host;
+ }
+
+ public static String getHostOnly(String host) {
+ return host.split(":", 2)[0];
+ }
+
+ public static String getPort(String host) {
+ return host.split(":", 2)[1];
+ }
+
+ public static boolean isHTTPSEnabled(OzoneConfiguration conf) {
+ return getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY;
+ }
+
+}
+
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java
index 808e85dd2f..42564412b1 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java
@@ -76,6 +76,10 @@ public class UnhealthyContainerMetadata {
this.keys = keyCount;
}
+ // Default constructor, used by jackson lib for object deserialization.
+ public UnhealthyContainerMetadata() {
+ }
+
public long getContainerID() {
return containerID;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java
index ef40329c80..eaf08d9ca8 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java
@@ -61,6 +61,10 @@ public class UnhealthyContainersResponse {
this.containers = containers;
}
+ // Default constructor, used by jackson lib for object deserialization.
+ public UnhealthyContainersResponse() {
+ }
+
public void setSummaryCount(String state, long count) {
if (state.equals(UnHealthyContainerStates.MISSING.toString())) {
this.missingCount = count;
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
index 7785e01a37..5e6d55ce70 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.recon.fsck;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -24,9 +25,15 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount;
+import
org.apache.hadoop.hdds.scm.container.replication.ECContainerReplicaCount;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import java.io.IOException;
+import
org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -38,31 +45,46 @@ import java.util.stream.Collectors;
public class ContainerHealthStatus {
- private ContainerInfo container;
- private int replicaDelta;
- private Set<ContainerReplica> healthyReplicas;
- private ContainerPlacementStatus placementStatus;
- private ReconContainerMetadataManager reconContainerMetadataManager;
- private int numReplicas;
- private long numKeys;
+ private final ContainerInfo container;
+ private final int replicaDelta;
+ private final Set<ContainerReplica> healthyReplicas;
+ private final Set<ContainerReplica> healthyAvailReplicas;
+ private final ContainerPlacementStatus placementStatus;
+ private final ReconContainerMetadataManager reconContainerMetadataManager;
+ private final int numReplicas;
+ private final long numKeys;
+ private final ContainerReplicaCount containerReplicaCount;
ContainerHealthStatus(ContainerInfo container,
- Set<ContainerReplica> healthyReplicas,
+ Set<ContainerReplica> replicas,
PlacementPolicy placementPolicy,
ReconContainerMetadataManager
- reconContainerMetadataManager) {
+ reconContainerMetadataManager,
+ OzoneConfiguration conf) {
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.container = container;
int repFactor = container.getReplicationConfig().getRequiredNodes();
- this.healthyReplicas = healthyReplicas
+ this.healthyReplicas = replicas
.stream()
.filter(r -> !r.getState()
.equals((ContainerReplicaProto.State.UNHEALTHY)))
.collect(Collectors.toSet());
- this.replicaDelta = repFactor - this.healthyReplicas.size();
+ this.healthyAvailReplicas = replicas
+ .stream()
+ // Filter unhealthy replicas and
+ // replicas belonging to out-of-service nodes.
+ .filter(r ->
+ (!r.getDatanodeDetails().isDecommissioned() &&
+ !r.getDatanodeDetails().isMaintenance() &&
+ !r.getState().equals(ContainerReplicaProto.State.UNHEALTHY)))
+ .collect(Collectors.toSet());
+ this.replicaDelta = repFactor - this.healthyAvailReplicas.size();
this.placementStatus = getPlacementStatus(placementPolicy, repFactor);
- this.numReplicas = healthyReplicas.size();
+ this.numReplicas = replicas.size();
this.numKeys = getContainerKeyCount(container.getContainerID());
+
+ this.containerReplicaCount =
+ getContainerReplicaCountInstance(conf, replicas);
}
public long getContainerID() {
@@ -78,6 +100,14 @@ public class ContainerHealthStatus {
}
public boolean isHealthy() {
+ return containerReplicaCount.isHealthy();
+ }
+
+ public boolean isSufficientlyReplicated() {
+ return containerReplicaCount.isSufficientlyReplicated();
+ }
+
+ public boolean isHealthilyReplicated() {
return replicaDelta == 0 && !isMisReplicated();
}
@@ -87,11 +117,11 @@ public class ContainerHealthStatus {
}
public boolean isOverReplicated() {
- return replicaDelta < 0;
+ return containerReplicaCount.isOverReplicated();
}
public boolean isUnderReplicated() {
- return !isMissing() && replicaDelta > 0;
+ return !isMissing() && !containerReplicaCount.isSufficientlyReplicated();
}
public int replicaDelta() {
@@ -99,7 +129,7 @@ public class ContainerHealthStatus {
}
public int getReplicaCount() {
- return healthyReplicas.size();
+ return healthyAvailReplicas.size();
}
public boolean isMisReplicated() {
@@ -150,4 +180,21 @@ public class ContainerHealthStatus {
public long getNumKeys() {
return numKeys;
}
+
+ private ContainerReplicaCount getContainerReplicaCountInstance(
+ OzoneConfiguration conf, Set<ContainerReplica> replicas) {
+ ReplicationManager.ReplicationManagerConfiguration rmConf = conf.getObject(
+ ReplicationManager.ReplicationManagerConfiguration.class);
+ boolean isEC = container.getReplicationConfig()
+ .getReplicationType() == HddsProtos.ReplicationType.EC;
+ return isEC ?
+ new ECContainerReplicaCount(container,
+ replicas, new ArrayList<>(),
+ rmConf.getMaintenanceRemainingRedundancy()) :
+ // This class ignores unhealthy replicas,
+ // therefore set 'considerUnhealthy' to false.
+ new RatisContainerReplicaCount(container,
+ replicas, new ArrayList<>(),
+ rmConf.getMaintenanceReplicaMinimum(), false);
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 4296dca366..577fb7d2bc 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -68,17 +69,20 @@ public class ContainerHealthTask extends ReconScmTask {
LoggerFactory.getLogger(ContainerHealthTask.class);
public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT);
- private ReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
- private StorageContainerServiceProvider scmClient;
- private ContainerManager containerManager;
- private ContainerHealthSchemaManager containerHealthSchemaManager;
- private ReconContainerMetadataManager reconContainerMetadataManager;
- private PlacementPolicy placementPolicy;
+ private final StorageContainerServiceProvider scmClient;
+ private final ContainerManager containerManager;
+ private final ContainerHealthSchemaManager containerHealthSchemaManager;
+ private final ReconContainerMetadataManager reconContainerMetadataManager;
+ private final PlacementPolicy placementPolicy;
private final long interval;
- private Set<ContainerInfo> processedContainers = new HashSet<>();
+ private final Set<ContainerInfo> processedContainers = new HashSet<>();
+ private final OzoneConfiguration conf;
+
+ @SuppressWarnings("checkstyle:ParameterNumber")
public ContainerHealthTask(
ContainerManager containerManager,
StorageContainerServiceProvider scmClient,
@@ -86,13 +90,15 @@ public class ContainerHealthTask extends ReconScmTask {
ContainerHealthSchemaManager containerHealthSchemaManager,
PlacementPolicy placementPolicy,
ReconTaskConfig reconTaskConfig,
- ReconContainerMetadataManager reconContainerMetadataManager) {
+ ReconContainerMetadataManager reconContainerMetadataManager,
+ OzoneConfiguration conf) {
super(reconTaskStatusDao);
this.scmClient = scmClient;
this.containerHealthSchemaManager = containerHealthSchemaManager;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.placementPolicy = placementPolicy;
this.containerManager = containerManager;
+ this.conf = conf;
interval = reconTaskConfig.getMissingContainerTaskInterval().toMillis();
}
@@ -220,7 +226,7 @@ public class ContainerHealthTask extends ReconScmTask {
Set<ContainerReplica> replicas =
containerManager.getContainerReplicas(container.containerID());
return new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, conf);
}
private void completeProcessingContainer(
@@ -312,8 +318,9 @@ public class ContainerHealthTask extends ReconScmTask {
Set<ContainerReplica> containerReplicas =
containerManager.getContainerReplicas(container.containerID());
ContainerHealthStatus h = new ContainerHealthStatus(container,
- containerReplicas, placementPolicy, reconContainerMetadataManager);
- if (h.isHealthy() || h.isDeleted()) {
+ containerReplicas, placementPolicy,
+ reconContainerMetadataManager, conf);
+ if (h.isHealthilyReplicated() || h.isDeleted()) {
return;
}
// For containers deleted in SCM, we sync the container state here.
@@ -426,7 +433,7 @@ public class ContainerHealthTask extends ReconScmTask {
Map<UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap) {
List<UnhealthyContainers> records = new ArrayList<>();
- if (container.isHealthy() || container.isDeleted()) {
+ if (container.isHealthilyReplicated() || container.isDeleted()) {
return records;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java
index 9a0dccdc99..32b479a19e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHistory.java
@@ -30,7 +30,7 @@ public class ContainerHistory implements Serializable {
private String datanodeHost;
private long firstSeenTime;
private long lastSeenTime;
- private long bcsId;
+ private long lastBcsId;
private String state;
public ContainerHistory(long containerId, String datanodeUuid,
@@ -41,12 +41,16 @@ public class ContainerHistory implements Serializable {
this.datanodeHost = datanodeHost;
this.firstSeenTime = firstSeenTime;
this.lastSeenTime = lastSeenTime;
- this.bcsId = lastBcsId;
+ this.lastBcsId = lastBcsId;
this.state = state;
}
+ // Default constructor, used by jackson lib for object deserialization.
+ public ContainerHistory() {
+ }
+
public long getLastBcsId() {
- return bcsId;
+ return lastBcsId;
}
public long getContainerId() {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 556c619419..046662398f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -268,7 +268,7 @@ public class ReconStorageContainerManagerFacade
ContainerHealthTask containerHealthTask = new ContainerHealthTask(
containerManager, scmServiceProvider, reconTaskStatusDao,
containerHealthSchemaManager, containerPlacementPolicy,
reconTaskConfig,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, conf);
this.containerSizeCountTask = new ContainerSizeCountTask(
containerManager,
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
index 0259eef46f..d404a168c7 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.fsck;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -29,8 +30,12 @@ import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,15 +54,28 @@ public class TestContainerHealthStatus {
private PlacementPolicy placementPolicy;
private ContainerInfo container;
private ReconContainerMetadataManager reconContainerMetadataManager;
+ private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+ private static Stream<Arguments> outOfServiceNodeStates() {
+ return Stream.of(
+ Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING),
+ Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED),
+ Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE),
+ Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE)
+ );
+ }
@BeforeEach
public void setup() {
placementPolicy = mock(PlacementPolicy.class);
container = mock(ContainerInfo.class);
reconContainerMetadataManager = mock(ReconContainerMetadataManager.class);
+ when(container.getReplicationFactor())
+ .thenReturn(HddsProtos.ReplicationFactor.THREE);
when(container.getReplicationConfig())
.thenReturn(RatisReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE));
+ when(container.getState()).thenReturn(HddsProtos.LifeCycleState.CLOSED);
when(container.containerID()).thenReturn(ContainerID.valueOf(123456));
when(container.getContainerID()).thenReturn((long)123456);
when(placementPolicy.validateContainerPlacement(
@@ -73,8 +91,8 @@ public class TestContainerHealthStatus {
ContainerReplicaProto.State.CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertTrue(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthilyReplicated());
assertFalse(status.isOverReplicated());
assertFalse(status.isUnderReplicated());
assertEquals(0, status.replicaDelta());
@@ -97,8 +115,8 @@ public class TestContainerHealthStatus {
ContainerReplicaProto.State.UNHEALTHY);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertTrue(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthilyReplicated());
assertFalse(status.isOverReplicated());
assertFalse(status.isUnderReplicated());
assertEquals(0, status.replicaDelta());
@@ -112,8 +130,8 @@ public class TestContainerHealthStatus {
Set<ContainerReplica> replicas = new HashSet<>();
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertFalse(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertFalse(status.isHealthilyReplicated());
assertFalse(status.isOverReplicated());
assertFalse(status.isUnderReplicated());
assertEquals(3, status.replicaDelta());
@@ -128,8 +146,8 @@ public class TestContainerHealthStatus {
ContainerReplicaProto.State.CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertFalse(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertFalse(status.isHealthilyReplicated());
assertFalse(status.isMissing());
assertFalse(status.isOverReplicated());
assertTrue(status.isUnderReplicated());
@@ -147,8 +165,8 @@ public class TestContainerHealthStatus {
ContainerReplicaProto.State.CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertFalse(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertFalse(status.isHealthilyReplicated());
assertFalse(status.isMissing());
assertFalse(status.isUnderReplicated());
assertTrue(status.isOverReplicated());
@@ -157,6 +175,185 @@ public class TestContainerHealthStatus {
assertEquals(0, status.misReplicatedDelta());
}
+ /**
+ * Starting with a ContainerHealthStatus of 1 over-replicated container
+ * replica and then updating a datanode to one of the out-of-service states.
+ * Replicas belonging to out-of-service nodes should be ignored and
+ * the container should be considered properly replicated.
+ */
+ @ParameterizedTest
+ @MethodSource("outOfServiceNodeStates")
+ public void testOverReplicationWithOutOfServiceNodes(
+ HddsProtos.NodeOperationalState state) {
+ Set<ContainerReplica> replicas = generateReplicas(container,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED);
+ ContainerHealthStatus status =
+ new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertFalse(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertTrue(status.isOverReplicated());
+
+ for (ContainerReplica replica : replicas) {
+ replicas.remove(replica);
+ replica.getDatanodeDetails().setPersistedOpState(state);
+ replicas.add(replica);
+ break;
+ }
+
+ status = new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+ }
+
+ /**
+ * Nodes in Decommission aren't expected to come back.
+ * If 1/3 nodes goes into decommission, the container is
+ * considered under-replicated. If 1/3 nodes goes into maintenance,
+ * because the node is expected to come back and there are
+ * 2 available replicas (minimum required num for Ratis THREE)
+ * the container isn't considered under-replicated.
+ */
+ @ParameterizedTest
+ @MethodSource("outOfServiceNodeStates")
+ public void testUnderReplicationWithOutOfServiceNodes(
+ HddsProtos.NodeOperationalState state) {
+ Set<ContainerReplica> replicas = generateReplicas(container,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED);
+ // IN_SERVICE, IN_SERVICE, IN_SERVICE
+ ContainerHealthStatus status =
+ new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthy());
+ assertTrue(status.isSufficientlyReplicated());
+ assertTrue(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+
+ for (ContainerReplica replica : replicas) {
+ replicas.remove(replica);
+ replica.getDatanodeDetails().setPersistedOpState(state);
+ replicas.add(replica);
+ break;
+ }
+
+ // IN_SERVICE, IN_SERVICE, DECOMMISSION/MAINTENANCE
+ status = new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthy());
+ assertFalse(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+
+ if (state.equals(HddsProtos.NodeOperationalState.DECOMMISSIONING) ||
+ state.equals(HddsProtos.NodeOperationalState.DECOMMISSIONED)) {
+ assertFalse(status.isSufficientlyReplicated());
+ assertTrue(status.isUnderReplicated());
+ } else {
+ assertTrue(status.isSufficientlyReplicated());
+ assertFalse(status.isUnderReplicated());
+ }
+ }
+
+ /**
+ * Starting with a healthy ContainerHealthStatus and then updating
+ * a datanode to a maintenance state.
+ * Any node in maintenance is expected to come back and since 2 replicas
+ * in online nodes are meeting the minimum requirement for
+ * proper replication, no additional replica-copy is made.
+ *
+ * IN_SERVICE, IN_SERVICE, IN_MAINTENANCE
+ *
+ * If 1 more node goes into maintenance, then 1 replica copy is made to
+ * maintain the minimum requirement for proper replication.
+ *
+ * IN_SERVICE, IN_SERVICE, IN_MAINTENANCE, IN_MAINTENANCE
+ *
+ * Before the copy is made we have
+ *
+ * IN_SERVICE, IN_MAINTENANCE, ENTERING_MAINTENANCE
+ *
+ * for that short time, the container is under-replicated.
+ *
+ * When the copy is made, the container is again considered
+ * sufficiently replicated.
+ */
+ @Test
+ public void testReplicationWithNodesInMaintenance() {
+ Set<ContainerReplica> replicas = generateReplicas(container,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED,
+ ContainerReplicaProto.State.CLOSED);
+ // IN_SERVICE, IN_SERVICE, IN_SERVICE
+ ContainerHealthStatus status =
+ new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthy());
+ assertTrue(status.isSufficientlyReplicated());
+ assertTrue(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+
+ // 1/3 replicas goes into maintenance
+ // IN_SERVICE, IN_SERVICE, IN_MAINTENANCE
+ for (ContainerReplica replica : replicas) {
+ replicas.remove(replica);
+ replica.getDatanodeDetails().setPersistedOpState(
+ HddsProtos.NodeOperationalState.IN_MAINTENANCE);
+ replicas.add(replica);
+ break;
+ }
+
+ status = new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthy());
+ assertTrue(status.isSufficientlyReplicated());
+ assertFalse(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertFalse(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+
+ // IN_SERVICE, IN_MAINTENANCE, ENTERING_MAINTENANCE
+ for (ContainerReplica replica : replicas) {
+ if (replica.getDatanodeDetails().getPersistedOpState().equals(
+ HddsProtos.NodeOperationalState.IN_SERVICE)) {
+ replicas.remove(replica);
+ replica.getDatanodeDetails().setPersistedOpState(
+ HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE);
+ replicas.add(replica);
+ break;
+ }
+ }
+
+ // Container should be under-replicated.
+ status = new ContainerHealthStatus(container, replicas, placementPolicy,
+ reconContainerMetadataManager, CONF);
+ assertTrue(status.isHealthy());
+ assertFalse(status.isSufficientlyReplicated());
+ assertFalse(status.isHealthilyReplicated());
+ assertFalse(status.isMissing());
+ assertTrue(status.isUnderReplicated());
+ assertFalse(status.isMisReplicated());
+ assertFalse(status.isOverReplicated());
+ }
+
@Test
public void testMisReplicated() {
Set<ContainerReplica> replicas = generateReplicas(container,
@@ -168,8 +365,8 @@ public class TestContainerHealthStatus {
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
- assertFalse(status.isHealthy());
+ reconContainerMetadataManager, CONF);
+ assertFalse(status.isHealthilyReplicated());
assertFalse(status.isMissing());
assertFalse(status.isUnderReplicated());
assertFalse(status.isOverReplicated());
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index f4e2681083..371fb6f9d6 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -156,7 +157,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
new ContainerHealthTask(scmMock.getContainerManager(),
scmMock.getScmServiceProvider(),
reconTaskStatusDao, containerHealthSchemaManager,
- placementMock, reconTaskConfig, reconContainerMetadataManager);
+ placementMock, reconTaskConfig,
+ reconContainerMetadataManager, new OzoneConfiguration());
containerHealthTask.start();
LambdaTestUtils.await(60000, 1000, () ->
(unHealthyContainersTableHandle.count() == 6));
@@ -320,7 +322,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
new ContainerHealthTask(scmMock.getContainerManager(),
scmMock.getScmServiceProvider(),
reconTaskStatusDao, containerHealthSchemaManager,
- placementMock, reconTaskConfig, reconContainerMetadataManager);
+ placementMock, reconTaskConfig,
+ reconContainerMetadataManager, new OzoneConfiguration());
containerHealthTask.start();
LambdaTestUtils.await(6000, 1000, () ->
(unHealthyContainersTableHandle.count() == 2));
@@ -362,6 +365,9 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
when(c.getReplicationConfig())
.thenReturn(RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE));
+ when(c.getReplicationFactor())
+ .thenReturn(HddsProtos.ReplicationFactor.THREE);
+ when(c.getState()).thenReturn(HddsProtos.LifeCycleState.CLOSED);
when(c.containerID()).thenReturn(ContainerID.valueOf(i));
containers.add(c);
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
index b5ceeed65c..7d55e612ba 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.fsck;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -64,6 +65,7 @@ public class TestContainerHealthTaskRecordGenerator {
private ContainerInfo container;
private ContainerInfo emptyContainer;
private ReconContainerMetadataManager reconContainerMetadataManager;
+ private static final OzoneConfiguration CONF = new OzoneConfiguration();
@BeforeEach
public void setup() throws IOException {
@@ -71,14 +73,19 @@ public class TestContainerHealthTaskRecordGenerator {
container = mock(ContainerInfo.class);
emptyContainer = mock(ContainerInfo.class);
reconContainerMetadataManager = mock(ReconContainerMetadataManager.class);
+ when(container.getReplicationFactor())
+ .thenReturn(HddsProtos.ReplicationFactor.THREE);
when(container.getReplicationConfig())
.thenReturn(
RatisReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE));
+ when(container.getState()).thenReturn(HddsProtos.LifeCycleState.CLOSED);
when(container.containerID()).thenReturn(ContainerID.valueOf(123456));
when(container.getContainerID()).thenReturn((long)123456);
when(reconContainerMetadataManager.getKeyCountForContainer(
(long) 123456)).thenReturn(5L);
+ when(emptyContainer.getReplicationFactor())
+ .thenReturn(HddsProtos.ReplicationFactor.THREE);
when(emptyContainer.getReplicationConfig())
.thenReturn(
RatisReplicationConfig
@@ -95,7 +102,7 @@ public class TestContainerHealthTaskRecordGenerator {
Set<ContainerReplica> replicas = new HashSet<>();
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
// Missing record should be retained
assertTrue(ContainerHealthTask.ContainerHealthRecords
.retainOrUpdateRecord(status, missingRecord()
@@ -114,7 +121,7 @@ public class TestContainerHealthTaskRecordGenerator {
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
assertFalse(ContainerHealthTask.ContainerHealthRecords
.retainOrUpdateRecord(status, missingRecord()
));
@@ -127,7 +134,7 @@ public class TestContainerHealthTaskRecordGenerator {
generateReplicas(container, CLOSED, CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
UnhealthyContainersRecord rec = underReplicatedRecord();
assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -150,7 +157,7 @@ public class TestContainerHealthTaskRecordGenerator {
// Container is now replicated OK - should be removed.
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
assertFalse(ContainerHealthTask.ContainerHealthRecords
.retainOrUpdateRecord(status, rec));
}
@@ -162,7 +169,7 @@ public class TestContainerHealthTaskRecordGenerator {
generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
UnhealthyContainersRecord rec = overReplicatedRecord();
assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -185,7 +192,7 @@ public class TestContainerHealthTaskRecordGenerator {
// Container is now replicated OK - should be removed.
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
assertFalse(ContainerHealthTask.ContainerHealthRecords
.retainOrUpdateRecord(status, rec));
}
@@ -200,7 +207,7 @@ public class TestContainerHealthTaskRecordGenerator {
.thenReturn(new ContainerPlacementStatusDefault(2, 3, 5));
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
UnhealthyContainersRecord rec = misReplicatedRecord();
assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -226,7 +233,7 @@ public class TestContainerHealthTaskRecordGenerator {
anyList(), anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(3, 3, 5));
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
assertFalse(ContainerHealthTask.ContainerHealthRecords
.retainOrUpdateRecord(status, rec));
}
@@ -243,7 +250,7 @@ public class TestContainerHealthTaskRecordGenerator {
// HEALTHY container - no records generated.
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
List<UnhealthyContainers> records =
ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, (long) 1234567,
@@ -273,7 +280,7 @@ public class TestContainerHealthTaskRecordGenerator {
generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED);
status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, (long) 1234567,
unhealthyContainerStateStatsMap);
@@ -311,7 +318,7 @@ public class TestContainerHealthTaskRecordGenerator {
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, (long) 1234567,
unhealthyContainerStateStatsMap);
@@ -359,7 +366,7 @@ public class TestContainerHealthTaskRecordGenerator {
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, (long) 1234567,
unhealthyContainerStateStatsMap);
@@ -388,7 +395,7 @@ public class TestContainerHealthTaskRecordGenerator {
status =
new ContainerHealthStatus(emptyContainer, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, (long) 345678,
unhealthyContainerStateStatsMap);
@@ -432,7 +439,7 @@ public class TestContainerHealthTaskRecordGenerator {
container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED);
ContainerHealthStatus status =
new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
List<UnhealthyContainers> records =
ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, existingRec, (long) 1234567,
@@ -460,7 +467,7 @@ public class TestContainerHealthTaskRecordGenerator {
// Missing
replicas.clear();
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, existingRec, (long) 1234567,
unhealthyContainerStateStatsMap);
@@ -490,7 +497,7 @@ public class TestContainerHealthTaskRecordGenerator {
anyList(), anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
status = new ContainerHealthStatus(container, replicas, placementPolicy,
- reconContainerMetadataManager);
+ reconContainerMetadataManager, CONF);
records = ContainerHealthTask.ContainerHealthRecords
.generateUnhealthyRecords(status, existingRec, (long) 1234567,
unhealthyContainerStateStatsMap);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]