This is an automated email from the ASF dual-hosted git repository.

zitadombi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a2aa1d4b83 HDDS-8580. Reduce memory usage in 
ContainerKeyMapperTask#reprocess (#4696)
a2aa1d4b83 is described below

commit a2aa1d4b8390a81f4be33f216fc748b44a348159
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Jun 3 17:31:28 2023 +0800

    HDDS-8580. Reduce memory usage in ContainerKeyMapperTask#reprocess (#4696)
---
 .../common/src/main/resources/ozone-default.xml    |  10 ++
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |   7 +
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  | 144 +++++++++++++++++++--
 .../ozone/recon/api/TestContainerEndpoint.java     |   7 +-
 .../ozone/recon/api/TestOmDBInsightEndPoint.java   |   6 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |  15 ++-
 6 files changed, 170 insertions(+), 19 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2a61ec1d4c..f2e908ab47 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3714,6 +3714,16 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.recon.containerkey.flush.db.max.threshold</name>
+    <value>150000</value>
+    <tag>OZONE, RECON, PERFORMANCE</tag>
+    <description>
+      Maximum threshold number of entries to hold in memory for Container Key 
Mapper task in hashmap before flushing to
+      recon rocks DB containerKeyTable
+    </description>
+  </property>
+    
   <property>
     <name>ozone.recon.heatmap.provider</name>
     <value></value>
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 2c97a0dfd3..b3c601c4c1 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -150,6 +150,13 @@ public final class  ReconServerConfigKeys {
   public static final long
       OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L;
 
+  public static final String
+      OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD =
+      "ozone.recon.containerkey.flush.db.max.threshold";
+
+  public static final long
+      OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 
1000L;
+
   public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY =
       "ozone.recon.scm.snapshot.task.interval.delay";
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index 086827e9cb..349cd7699f 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -27,6 +27,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,12 +37,14 @@ import java.util.Set;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
@@ -62,11 +65,19 @@ public class ContainerKeyMapperTask implements ReconOmTask {
       LoggerFactory.getLogger(ContainerKeyMapperTask.class);
 
   private ReconContainerMetadataManager reconContainerMetadataManager;
+  private final long containerKeyFlushToDBMaxThreshold;
 
   @Inject
   public ContainerKeyMapperTask(ReconContainerMetadataManager
-                                        reconContainerMetadataManager) {
+                                        reconContainerMetadataManager,
+                                OzoneConfiguration configuration) {
     this.reconContainerMetadataManager = reconContainerMetadataManager;
+    this.containerKeyFlushToDBMaxThreshold = configuration.getLong(
+        ReconServerConfigKeys.
+            OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD,
+        ReconServerConfigKeys.
+            OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT
+    );
   }
 
   /**
@@ -76,13 +87,12 @@ public class ContainerKeyMapperTask implements ReconOmTask {
   @Override
   public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
     long omKeyCount = 0;
-    // Maps the (container, key) -> count
+
+    // In-memory maps for fast look up and batch write
+    // (container, key) -> count
     Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
-    // Maps the key -> containerId
+    // containerId -> key count
     Map<Long, Long> containerKeyCountMap = new HashMap<>();
-    // List of the deleted (container, key) pair's
-    List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
-
     try {
       LOG.info("Starting a 'reprocess' run of ContainerKeyMapperTask.");
       Instant start = Instant.now();
@@ -94,6 +104,11 @@ public class ContainerKeyMapperTask implements ReconOmTask {
       // loop over both key table and file table
       for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY,
           BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+        // (HDDS-8580) Since "reprocess" iterate over the whole key table,
+        // containerKeyMap needs to be incrementally flushed to DB based on
+        // configured batch threshold.
+        // containerKeyCountMap can be flushed at the end since the number
+        // of containers in a cluster will not have significant memory 
overhead.
         Table<String, OmKeyInfo> omKeyInfoTable =
             omMetadataManager.getKeyTable(layout);
         try (
@@ -102,30 +117,66 @@ public class ContainerKeyMapperTask implements 
ReconOmTask {
           while (keyIter.hasNext()) {
             Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
             OmKeyInfo omKeyInfo = kv.getValue();
-            handlePutOMKeyEvent(kv.getKey(), omKeyInfo, containerKeyMap,
-                containerKeyCountMap, deletedKeyCountList);
+            handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap,
+                containerKeyCountMap);
+            if (!checkAndCallFlushToDB(containerKeyMap)) {
+              LOG.error("Unable to flush containerKey information to the DB");
+              return new ImmutablePair<>(getTaskName(), false);
+            }
             omKeyCount++;
           }
         }
       }
 
+      // flush and commit left out keys at end,
+      // also batch write containerKeyCountMap to the containerKeyCountTable
+      if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
+          containerKeyCountMap)) {
+        LOG.error("Unable to flush Container Key Count and " +
+            "remaining Container Key information to the DB");
+        return new ImmutablePair<>(getTaskName(), false);
+      }
+
       LOG.info("Completed 'reprocess' of ContainerKeyMapperTask.");
       Instant end = Instant.now();
       long duration = Duration.between(start, end).toMillis();
       LOG.info("It took me {} seconds to process {} keys.",
           (double) duration / 1000.0, omKeyCount);
     } catch (IOException ioEx) {
-      LOG.error("Unable to populate Container Key Prefix data in Recon DB. ",
+      LOG.error("Unable to populate Container Key data in Recon DB. ",
           ioEx);
       return new ImmutablePair<>(getTaskName(), false);
     }
+    return new ImmutablePair<>(getTaskName(), true);
+  }
+
+  private boolean flushAndCommitContainerKeyInfoToDB(
+      Map<ContainerKeyPrefix, Integer> containerKeyMap,
+      Map<Long, Long> containerKeyCountMap) {
     try {
-      writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList);
+      // deleted container list is not needed since "reprocess" only has
+      // put operations
+      writeToTheDB(containerKeyMap, containerKeyCountMap,
+          Collections.emptyList());
+      containerKeyMap.clear();
+      containerKeyCountMap.clear();
     } catch (IOException e) {
-      LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
-      return new ImmutablePair<>(getTaskName(), false);
+      LOG.error("Unable to write Container Key and " +
+          "Container Key Count data in Recon DB.", e);
+      return false;
     }
-    return new ImmutablePair<>(getTaskName(), true);
+    return true;
+  }
+
+  private boolean checkAndCallFlushToDB(
+      Map<ContainerKeyPrefix, Integer> containerKeyMap) {
+    // if containerKeyMap more than entries, flush to DB and clear the map
+    if (null != containerKeyMap && containerKeyMap.size() >=
+          containerKeyFlushToDBMaxThreshold) {
+      return flushAndCommitContainerKeyInfoToDB(containerKeyMap,
+          Collections.emptyMap());
+    }
+    return true;
   }
 
   @Override
@@ -145,8 +196,17 @@ public class ContainerKeyMapperTask implements ReconOmTask 
{
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
     int eventCount = 0;
     final Collection<String> taskTables = getTaskTables();
+
+    // In-memory maps for fast look up and batch write
+    // (HDDS-8580) containerKeyMap map is allowed to be used
+    // in "process" without batching since the maximum number of keys
+    // is bounded by delta limit configurations
+
+    // (container, key) -> count
     Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
+    // containerId -> key count
     Map<Long, Long> containerKeyCountMap = new HashMap<>();
+    // List of the deleted (container, key) pair's
     List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
 
     while (eventIterator.hasNext()) {
@@ -383,4 +443,62 @@ public class ContainerKeyMapperTask implements ReconOmTask 
{
     }
   }
 
+  /**
+   * Write an OM key to container DB and update containerID -> no. of keys
+   * count to the Global Stats table.
+   *
+   * @param key key String
+   * @param omKeyInfo omKeyInfo value
+   * @param containerKeyMap we keep the added containerKeys in this map
+   *                        to allow incremental batching to containerKeyTable
+   * @param containerKeyCountMap we keep the containerKey counts in this map 
+   *                             to allow batching to containerKeyCountTable
+   *                             after reprocessing is done
+   * @throws IOException if unable to write to recon DB.
+   */
+  private void handleKeyReprocess(String key,
+                                  OmKeyInfo omKeyInfo,
+                                  Map<ContainerKeyPrefix, Integer>
+                                      containerKeyMap,
+                                  Map<Long, Long> containerKeyCountMap)
+      throws IOException {
+    long containerCountToIncrement = 0;
+    for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
+        .getKeyLocationVersions()) {
+      long keyVersion = omKeyLocationInfoGroup.getVersion();
+      for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
+          .getLocationList()) {
+        long containerId = omKeyLocationInfo.getContainerID();
+        ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
+            containerId, key, keyVersion);
+        if (reconContainerMetadataManager.getCountForContainerKeyPrefix(
+            containerKeyPrefix) == 0
+            && !containerKeyMap.containsKey(containerKeyPrefix)) {
+          // Save on writes. No need to save same container-key prefix
+          // mapping again.
+          containerKeyMap.put(containerKeyPrefix, 1);
+
+          // check if container already exists and
+          // if it exists, update the count of keys for the given containerID
+          // else, increment the count of containers and initialize keyCount
+          long keyCount;
+          if (containerKeyCountMap.containsKey(containerId)) {
+            keyCount = containerKeyCountMap.get(containerId);
+          } else {
+            containerCountToIncrement++;
+            keyCount = 0;
+          }
+
+          // increment the count and update containerKeyCount.
+          containerKeyCountMap.put(containerId, ++keyCount);
+        }
+      }
+    }
+
+    if (containerCountToIncrement > 0) {
+      reconContainerMetadataManager
+          .incrementContainerCountBy(containerCountToIncrement);
+    }
+  }
+
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
index 47e25bf22b..9b7ba54df0 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -127,6 +128,8 @@ public class TestContainerEndpoint {
   private boolean isSetupDone = false;
   private ContainerHealthSchemaManager containerHealthSchemaManager;
   private ReconOMMetadataManager reconOMMetadataManager;
+  private OzoneConfiguration omConfiguration;
+
   private ContainerID containerID = ContainerID.valueOf(1L);
   private Pipeline pipeline;
   private PipelineID pipelineID;
@@ -208,6 +211,7 @@ public class TestContainerEndpoint {
       initializeInjector();
       isSetupDone = true;
     }
+    omConfiguration = new OzoneConfiguration();
 
     List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
     BlockID blockID1 = new BlockID(1, 101);
@@ -289,7 +293,8 @@ public class TestContainerEndpoint {
 
   private void reprocessContainerKeyMapper() {
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            omConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
   }
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 23b94e4ccf..71fe6185f7 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.api;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
@@ -78,6 +79,7 @@ public class TestOmDBInsightEndPoint {
   private OMDBInsightEndpoint omdbInsightEndpoint;
   private Pipeline pipeline;
   private Random random = new Random();
+  private OzoneConfiguration ozoneConfiguration;
 
   @Before
   public void setUp() throws Exception {
@@ -110,6 +112,7 @@ public class TestOmDBInsightEndPoint {
         ozoneStorageContainerManager.getPipelineManager();
     pipeline = getRandomPipeline();
     reconPipelineManager.addPipeline(pipeline);
+    ozoneConfiguration = new OzoneConfiguration();
     setUpOmData();
   }
 
@@ -182,7 +185,8 @@ public class TestOmDBInsightEndPoint {
     when(omMetadataManagerMock.getKeyTable(getBucketLayout()))
         .thenReturn(tableMock);
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            ozoneConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
   }
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index 176b9761ff..eea213a544 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -37,6 +37,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -66,6 +67,7 @@ public class TestContainerKeyMapperTask {
   private OMMetadataManager omMetadataManager;
   private ReconOMMetadataManager reconOMMetadataManager;
   private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
+  private OzoneConfiguration omConfiguration;
 
   private static final String FSO_KEY_NAME = "dir1/file7";
   private static final String BUCKET_NAME = "bucket1";
@@ -85,6 +87,7 @@ public class TestContainerKeyMapperTask {
     ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
     reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
         temporaryFolder.newFolder());
+    omConfiguration = new OzoneConfiguration();
 
     ReconTestInjector reconTestInjector =
         new ReconTestInjector.Builder(temporaryFolder)
@@ -132,7 +135,8 @@ public class TestContainerKeyMapperTask {
         Collections.singletonList(omKeyLocationInfoGroup));
 
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            omConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
 
     keyPrefixesForContainer =
@@ -204,7 +208,8 @@ public class TestContainerKeyMapperTask {
 
     // Reprocess container key mappings
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            omConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
 
     // Check the key prefixes for container 1
@@ -311,7 +316,8 @@ public class TestContainerKeyMapperTask {
         }});
 
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            omConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
 
     keyPrefixesForContainer = reconContainerMetadataManager
@@ -380,7 +386,8 @@ public class TestContainerKeyMapperTask {
 
     // Reprocess container key mappings
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(reconContainerMetadataManager);
+        new ContainerKeyMapperTask(reconContainerMetadataManager,
+            omConfiguration);
 
     String bucket = BUCKET_NAME;
     String volume = VOLUME_NAME;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to