This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1398f587c3 HDDS-9819. Recon - Potential memory overflow in Container
Health Task. (#5841)
1398f587c3 is described below
commit 1398f587c360a553f92536ae521ac171d7413a05
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Sun Jan 14 21:44:02 2024 +0530
HDDS-9819. Recon - Potential memory overflow in Container Health Task.
(#5841)
---
.../apache/hadoop/ozone/recon/TestReconTasks.java | 7 ++---
.../ozone/recon/fsck/ContainerHealthTask.java | 35 +++++++++++++++++++---
.../ozone/recon/fsck/TestContainerHealthTask.java | 18 +++++++----
3 files changed, 45 insertions(+), 15 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
index 965dd2f525..44385698c5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
@@ -72,8 +72,8 @@ public class TestReconTasks {
taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15));
conf.setFromObject(taskConfig);
- conf.set("ozone.scm.stale.node.interval", "10s");
- conf.set("ozone.scm.dead.node.interval", "20s");
+ conf.set("ozone.scm.stale.node.interval", "6s");
+ conf.set("ozone.scm.dead.node.interval", "10s");
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
.includeRecon(true).build();
cluster.waitForClusterToBeReady();
@@ -102,9 +102,6 @@ public class TestReconTasks {
final ContainerInfo container2 = scmContainerManager.allocateContainer(
RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE), "admin");
- reconContainerManager.allocateContainer(
- RatisReplicationConfig.getInstance(
- HddsProtos.ReplicationFactor.ONE), "admin");
scmContainerManager.updateContainerState(container1.containerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
scmContainerManager.updateContainerState(container2.containerID(),
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index bb93923cfd..4296dca366 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS;
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES;
@@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerHealthTask.class);
+ public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT);
private ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -131,8 +133,24 @@ public class ContainerHealthTask extends ReconScmTask {
LOG.info("Container Health task thread took {} milliseconds to" +
" process {} existing database records.",
Time.monotonicNow() - start, existingCount);
+
+ checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
+ processedContainers.clear();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void checkAndProcessContainers(
+ Map<UnHealthyContainerStates, Map<String, Long>>
+ unhealthyContainerStateStatsMap, long currentTime) {
+ ContainerID startID = ContainerID.valueOf(1);
+ List<ContainerInfo> containers = containerManager.getContainers(startID,
+ FETCH_COUNT);
+ long start;
+ long iterationCount = 0;
+ while (!containers.isEmpty()) {
start = Time.monotonicNow();
- final List<ContainerInfo> containers = containerManager.getContainers();
containers.stream()
.filter(c -> !processedContainers.contains(c))
.forEach(c -> processContainer(c, currentTime,
@@ -142,10 +160,19 @@ public class ContainerHealthTask extends ReconScmTask {
" processing {} containers.", Time.monotonicNow() - start,
containers.size());
logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
- processedContainers.clear();
- } finally {
- lock.writeLock().unlock();
+ if (containers.size() >= FETCH_COUNT) {
+ startID = ContainerID.valueOf(
+ containers.get(containers.size() - 1).getContainerID() + 1);
+ containers = containerManager.getContainers(startID, FETCH_COUNT);
+ } else {
+ containers.clear();
+ }
+ iterationCount++;
}
+ LOG.info(
+ "Container Health task thread took {} iterations to fetch all " +
+ "containers using batched approach with batch size of {}",
+ iterationCount, FETCH_COUNT);
}
private void logUnhealthyContainerStats(
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index ea8f9f2a4b..358799cc03 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.ozone.recon.fsck;
-import static
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
import static org.assertj.core.api.Assertions.assertThat;
+import static
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -96,7 +98,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
List<ContainerInfo> mockContainers = getMockContainers(7);
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
- when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+ when(containerManagerMock.getContainers(any(ContainerID.class),
+ anyInt())).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -151,7 +154,7 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
reconTaskStatusDao, containerHealthSchemaManager,
placementMock, reconTaskConfig, reconContainerMetadataManager);
containerHealthTask.start();
- LambdaTestUtils.await(6000, 1000, () ->
+ LambdaTestUtils.await(60000, 1000, () ->
(unHealthyContainersTableHandle.count() == 6));
UnhealthyContainers rec =
unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
@@ -192,7 +195,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
ReconTaskStatus taskStatus =
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
-
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
+ assertThat(taskStatus.getLastUpdatedTimestamp())
+ .isGreaterThan(currentTime);
// Now run the job again, to check that relevant records are updated or
// removed as appropriate. Need to adjust the return value for all the
mocks
@@ -267,7 +271,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
List<ContainerInfo> mockContainers = getMockContainers(3);
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
- when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+ when(containerManagerMock.getContainers(any(ContainerID.class),
+ anyInt())).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -327,7 +332,8 @@ public class TestContainerHealthTask extends
AbstractReconSqlDBTest {
ReconTaskStatus taskStatus =
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
-
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
+ assertThat(taskStatus.getLastUpdatedTimestamp())
+ .isGreaterThan(currentTime);
}
private Set<ContainerReplica> getMockReplicas(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]