This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1398f587c3 HDDS-9819. Recon - Potential memory overflow in Container 
Health Task. (#5841)
1398f587c3 is described below

commit 1398f587c360a553f92536ae521ac171d7413a05
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Sun Jan 14 21:44:02 2024 +0530

    HDDS-9819. Recon - Potential memory overflow in Container Health Task. 
(#5841)
---
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |  7 ++---
 .../ozone/recon/fsck/ContainerHealthTask.java      | 35 +++++++++++++++++++---
 .../ozone/recon/fsck/TestContainerHealthTask.java  | 18 +++++++----
 3 files changed, 45 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
index 965dd2f525..44385698c5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
@@ -72,8 +72,8 @@ public class TestReconTasks {
     taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15));
     conf.setFromObject(taskConfig);
 
-    conf.set("ozone.scm.stale.node.interval", "10s");
-    conf.set("ozone.scm.dead.node.interval", "20s");
+    conf.set("ozone.scm.stale.node.interval", "6s");
+    conf.set("ozone.scm.dead.node.interval", "10s");
     cluster =  MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
         .includeRecon(true).build();
     cluster.waitForClusterToBeReady();
@@ -102,9 +102,6 @@ public class TestReconTasks {
     final ContainerInfo container2 = scmContainerManager.allocateContainer(
         RatisReplicationConfig.getInstance(
             HddsProtos.ReplicationFactor.ONE), "admin");
-    reconContainerManager.allocateContainer(
-        RatisReplicationConfig.getInstance(
-            HddsProtos.ReplicationFactor.ONE), "admin");
     scmContainerManager.updateContainerState(container1.containerID(),
         HddsProtos.LifeCycleEvent.FINALIZE);
     scmContainerManager.updateContainerState(container2.containerID(),
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index bb93923cfd..4296dca366 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
 import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS;
 import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES;
 
@@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerHealthTask.class);
+  public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT);
 
   private ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
@@ -131,8 +133,24 @@ public class ContainerHealthTask extends ReconScmTask {
       LOG.info("Container Health task thread took {} milliseconds to" +
               " process {} existing database records.",
           Time.monotonicNow() - start, existingCount);
+
+      checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
+      processedContainers.clear();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  private void checkAndProcessContainers(
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap, long currentTime) {
+    ContainerID startID = ContainerID.valueOf(1);
+    List<ContainerInfo> containers = containerManager.getContainers(startID,
+        FETCH_COUNT);
+    long start;
+    long iterationCount = 0;
+    while (!containers.isEmpty()) {
       start = Time.monotonicNow();
-      final List<ContainerInfo> containers = containerManager.getContainers();
       containers.stream()
           .filter(c -> !processedContainers.contains(c))
           .forEach(c -> processContainer(c, currentTime,
@@ -142,10 +160,19 @@ public class ContainerHealthTask extends ReconScmTask {
               " processing {} containers.", Time.monotonicNow() - start,
           containers.size());
       logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
-      processedContainers.clear();
-    } finally {
-      lock.writeLock().unlock();
+      if (containers.size() >= FETCH_COUNT) {
+        startID = ContainerID.valueOf(
+            containers.get(containers.size() - 1).getContainerID() + 1);
+        containers = containerManager.getContainers(startID, FETCH_COUNT);
+      } else {
+        containers.clear();
+      }
+      iterationCount++;
     }
+    LOG.info(
+        "Container Health task thread took {} iterations to fetch all " +
+            "containers using batched approach with batch size of {}",
+        iterationCount, FETCH_COUNT);
   }
 
   private void logUnhealthyContainerStats(
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index ea8f9f2a4b..358799cc03 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.ozone.recon.fsck;
 
-import static 
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -96,7 +98,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     List<ContainerInfo> mockContainers = getMockContainers(7);
     when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
     when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
-    when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+    when(containerManagerMock.getContainers(any(ContainerID.class),
+        anyInt())).thenReturn(mockContainers);
     for (ContainerInfo c : mockContainers) {
       when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
       when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -151,7 +154,7 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
             reconTaskStatusDao, containerHealthSchemaManager,
             placementMock, reconTaskConfig, reconContainerMetadataManager);
     containerHealthTask.start();
-    LambdaTestUtils.await(6000, 1000, () ->
+    LambdaTestUtils.await(60000, 1000, () ->
         (unHealthyContainersTableHandle.count() == 6));
     UnhealthyContainers rec =
         unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
@@ -192,7 +195,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
 
     ReconTaskStatus taskStatus =
         reconTaskStatusDao.findById(containerHealthTask.getTaskName());
-    
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
+    assertThat(taskStatus.getLastUpdatedTimestamp())
+        .isGreaterThan(currentTime);
 
     // Now run the job again, to check that relevant records are updated or
     // removed as appropriate. Need to adjust the return value for all the 
mocks
@@ -267,7 +271,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     List<ContainerInfo> mockContainers = getMockContainers(3);
     when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
     when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
-    when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+    when(containerManagerMock.getContainers(any(ContainerID.class),
+        anyInt())).thenReturn(mockContainers);
     for (ContainerInfo c : mockContainers) {
       when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
       when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -327,7 +332,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
 
     ReconTaskStatus taskStatus =
         reconTaskStatusDao.findById(containerHealthTask.getTaskName());
-    
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
+    assertThat(taskStatus.getLastUpdatedTimestamp())
+        .isGreaterThan(currentTime);
   }
 
   private Set<ContainerReplica> getMockReplicas(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to