This is an automated email from the ASF dual-hosted git repository.
zitadombi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a2aa1d4b83 HDDS-8580. Reduce memory usage in
ContainerKeyMapperTask#reprocess (#4696)
a2aa1d4b83 is described below
commit a2aa1d4b8390a81f4be33f216fc748b44a348159
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Jun 3 17:31:28 2023 +0800
HDDS-8580. Reduce memory usage in ContainerKeyMapperTask#reprocess (#4696)
---
.../common/src/main/resources/ozone-default.xml | 10 ++
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 7 +
.../ozone/recon/tasks/ContainerKeyMapperTask.java | 144 +++++++++++++++++++--
.../ozone/recon/api/TestContainerEndpoint.java | 7 +-
.../ozone/recon/api/TestOmDBInsightEndPoint.java | 6 +-
.../recon/tasks/TestContainerKeyMapperTask.java | 15 ++-
6 files changed, 170 insertions(+), 19 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2a61ec1d4c..f2e908ab47 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3714,6 +3714,16 @@
</description>
</property>
+ <property>
+ <name>ozone.recon.containerkey.flush.db.max.threshold</name>
+ <value>150000</value>
+ <tag>OZONE, RECON, PERFORMANCE</tag>
+ <description>
+ Maximum threshold number of entries to hold in memory for Container Key
Mapper task in hashmap before flushing to
+ recon rocks DB containerKeyTable
+ </description>
+ </property>
+
<property>
<name>ozone.recon.heatmap.provider</name>
<value></value>
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 2c97a0dfd3..b3c601c4c1 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -150,6 +150,13 @@ public final class ReconServerConfigKeys {
public static final long
OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L;
+ public static final String
+ OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD =
+ "ozone.recon.containerkey.flush.db.max.threshold";
+
+ public static final long
+ OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 *
1000L;
+
public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY =
"ozone.recon.scm.snapshot.task.interval.delay";
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index 086827e9cb..349cd7699f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -27,6 +27,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -36,12 +37,14 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
@@ -62,11 +65,19 @@ public class ContainerKeyMapperTask implements ReconOmTask {
LoggerFactory.getLogger(ContainerKeyMapperTask.class);
private ReconContainerMetadataManager reconContainerMetadataManager;
+ private final long containerKeyFlushToDBMaxThreshold;
@Inject
public ContainerKeyMapperTask(ReconContainerMetadataManager
- reconContainerMetadataManager) {
+ reconContainerMetadataManager,
+ OzoneConfiguration configuration) {
this.reconContainerMetadataManager = reconContainerMetadataManager;
+ this.containerKeyFlushToDBMaxThreshold = configuration.getLong(
+ ReconServerConfigKeys.
+ OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD,
+ ReconServerConfigKeys.
+ OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT
+ );
}
/**
@@ -76,13 +87,12 @@ public class ContainerKeyMapperTask implements ReconOmTask {
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
long omKeyCount = 0;
- // Maps the (container, key) -> count
+
+ // In-memory maps for fast look up and batch write
+ // (container, key) -> count
Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- // Maps the key -> containerId
+ // containerId -> key count
Map<Long, Long> containerKeyCountMap = new HashMap<>();
- // List of the deleted (container, key) pair's
- List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
-
try {
LOG.info("Starting a 'reprocess' run of ContainerKeyMapperTask.");
Instant start = Instant.now();
@@ -94,6 +104,11 @@ public class ContainerKeyMapperTask implements ReconOmTask {
// loop over both key table and file table
for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY,
BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+ // (HDDS-8580) Since "reprocess" iterate over the whole key table,
+ // containerKeyMap needs to be incrementally flushed to DB based on
+ // configured batch threshold.
+ // containerKeyCountMap can be flushed at the end since the number
+ // of containers in a cluster will not have significant memory
overhead.
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(layout);
try (
@@ -102,30 +117,66 @@ public class ContainerKeyMapperTask implements
ReconOmTask {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
OmKeyInfo omKeyInfo = kv.getValue();
- handlePutOMKeyEvent(kv.getKey(), omKeyInfo, containerKeyMap,
- containerKeyCountMap, deletedKeyCountList);
+ handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap,
+ containerKeyCountMap);
+ if (!checkAndCallFlushToDB(containerKeyMap)) {
+ LOG.error("Unable to flush containerKey information to the DB");
+ return new ImmutablePair<>(getTaskName(), false);
+ }
omKeyCount++;
}
}
}
+ // flush and commit left out keys at end,
+ // also batch write containerKeyCountMap to the containerKeyCountTable
+ if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
+ containerKeyCountMap)) {
+ LOG.error("Unable to flush Container Key Count and " +
+ "remaining Container Key information to the DB");
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+
LOG.info("Completed 'reprocess' of ContainerKeyMapperTask.");
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.info("It took me {} seconds to process {} keys.",
(double) duration / 1000.0, omKeyCount);
} catch (IOException ioEx) {
- LOG.error("Unable to populate Container Key Prefix data in Recon DB. ",
+ LOG.error("Unable to populate Container Key data in Recon DB. ",
ioEx);
return new ImmutablePair<>(getTaskName(), false);
}
+ return new ImmutablePair<>(getTaskName(), true);
+ }
+
+ private boolean flushAndCommitContainerKeyInfoToDB(
+ Map<ContainerKeyPrefix, Integer> containerKeyMap,
+ Map<Long, Long> containerKeyCountMap) {
try {
- writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList);
+ // deleted container list is not needed since "reprocess" only has
+ // put operations
+ writeToTheDB(containerKeyMap, containerKeyCountMap,
+ Collections.emptyList());
+ containerKeyMap.clear();
+ containerKeyCountMap.clear();
} catch (IOException e) {
- LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
- return new ImmutablePair<>(getTaskName(), false);
+ LOG.error("Unable to write Container Key and " +
+ "Container Key Count data in Recon DB.", e);
+ return false;
}
- return new ImmutablePair<>(getTaskName(), true);
+ return true;
+ }
+
+ private boolean checkAndCallFlushToDB(
+ Map<ContainerKeyPrefix, Integer> containerKeyMap) {
+ // if containerKeyMap more than entries, flush to DB and clear the map
+ if (null != containerKeyMap && containerKeyMap.size() >=
+ containerKeyFlushToDBMaxThreshold) {
+ return flushAndCommitContainerKeyInfoToDB(containerKeyMap,
+ Collections.emptyMap());
+ }
+ return true;
}
@Override
@@ -145,8 +196,17 @@ public class ContainerKeyMapperTask implements ReconOmTask
{
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
int eventCount = 0;
final Collection<String> taskTables = getTaskTables();
+
+ // In-memory maps for fast look up and batch write
+ // (HDDS-8580) containerKeyMap map is allowed to be used
+ // in "process" without batching since the maximum number of keys
+ // is bounded by delta limit configurations
+
+ // (container, key) -> count
Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
+ // containerId -> key count
Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ // List of the deleted (container, key) pair's
List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
while (eventIterator.hasNext()) {
@@ -383,4 +443,62 @@ public class ContainerKeyMapperTask implements ReconOmTask
{
}
}
+ /**
+ * Write an OM key to container DB and update containerID -> no. of keys
+ * count to the Global Stats table.
+ *
+ * @param key key String
+ * @param omKeyInfo omKeyInfo value
+ * @param containerKeyMap we keep the added containerKeys in this map
+ * to allow incremental batching to containerKeyTable
+ * @param containerKeyCountMap we keep the containerKey counts in this map
+ * to allow batching to containerKeyCountTable
+ * after reprocessing is done
+ * @throws IOException if unable to write to recon DB.
+ */
+ private void handleKeyReprocess(String key,
+ OmKeyInfo omKeyInfo,
+ Map<ContainerKeyPrefix, Integer>
+ containerKeyMap,
+ Map<Long, Long> containerKeyCountMap)
+ throws IOException {
+ long containerCountToIncrement = 0;
+ for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
+ .getKeyLocationVersions()) {
+ long keyVersion = omKeyLocationInfoGroup.getVersion();
+ for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
+ .getLocationList()) {
+ long containerId = omKeyLocationInfo.getContainerID();
+ ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
+ containerId, key, keyVersion);
+ if (reconContainerMetadataManager.getCountForContainerKeyPrefix(
+ containerKeyPrefix) == 0
+ && !containerKeyMap.containsKey(containerKeyPrefix)) {
+ // Save on writes. No need to save same container-key prefix
+ // mapping again.
+ containerKeyMap.put(containerKeyPrefix, 1);
+
+ // check if container already exists and
+ // if it exists, update the count of keys for the given containerID
+ // else, increment the count of containers and initialize keyCount
+ long keyCount;
+ if (containerKeyCountMap.containsKey(containerId)) {
+ keyCount = containerKeyCountMap.get(containerId);
+ } else {
+ containerCountToIncrement++;
+ keyCount = 0;
+ }
+
+ // increment the count and update containerKeyCount.
+ containerKeyCountMap.put(containerId, ++keyCount);
+ }
+ }
+ }
+
+ if (containerCountToIncrement > 0) {
+ reconContainerMetadataManager
+ .incrementContainerCountBy(containerCountToIncrement);
+ }
+ }
+
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
index 47e25bf22b..9b7ba54df0 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -127,6 +128,8 @@ public class TestContainerEndpoint {
private boolean isSetupDone = false;
private ContainerHealthSchemaManager containerHealthSchemaManager;
private ReconOMMetadataManager reconOMMetadataManager;
+ private OzoneConfiguration omConfiguration;
+
private ContainerID containerID = ContainerID.valueOf(1L);
private Pipeline pipeline;
private PipelineID pipelineID;
@@ -208,6 +211,7 @@ public class TestContainerEndpoint {
initializeInjector();
isSetupDone = true;
}
+ omConfiguration = new OzoneConfiguration();
List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
BlockID blockID1 = new BlockID(1, 101);
@@ -289,7 +293,8 @@ public class TestContainerEndpoint {
private void reprocessContainerKeyMapper() {
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ omConfiguration);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 23b94e4ccf..71fe6185f7 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.api;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
@@ -78,6 +79,7 @@ public class TestOmDBInsightEndPoint {
private OMDBInsightEndpoint omdbInsightEndpoint;
private Pipeline pipeline;
private Random random = new Random();
+ private OzoneConfiguration ozoneConfiguration;
@Before
public void setUp() throws Exception {
@@ -110,6 +112,7 @@ public class TestOmDBInsightEndPoint {
ozoneStorageContainerManager.getPipelineManager();
pipeline = getRandomPipeline();
reconPipelineManager.addPipeline(pipeline);
+ ozoneConfiguration = new OzoneConfiguration();
setUpOmData();
}
@@ -182,7 +185,8 @@ public class TestOmDBInsightEndPoint {
when(omMetadataManagerMock.getKeyTable(getBucketLayout()))
.thenReturn(tableMock);
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ ozoneConfiguration);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index 176b9761ff..eea213a544 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -37,6 +37,7 @@ import java.util.Map;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -66,6 +67,7 @@ public class TestContainerKeyMapperTask {
private OMMetadataManager omMetadataManager;
private ReconOMMetadataManager reconOMMetadataManager;
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
+ private OzoneConfiguration omConfiguration;
private static final String FSO_KEY_NAME = "dir1/file7";
private static final String BUCKET_NAME = "bucket1";
@@ -85,6 +87,7 @@ public class TestContainerKeyMapperTask {
ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
temporaryFolder.newFolder());
+ omConfiguration = new OzoneConfiguration();
ReconTestInjector reconTestInjector =
new ReconTestInjector.Builder(temporaryFolder)
@@ -132,7 +135,8 @@ public class TestContainerKeyMapperTask {
Collections.singletonList(omKeyLocationInfoGroup));
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ omConfiguration);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer =
@@ -204,7 +208,8 @@ public class TestContainerKeyMapperTask {
// Reprocess container key mappings
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ omConfiguration);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
// Check the key prefixes for container 1
@@ -311,7 +316,8 @@ public class TestContainerKeyMapperTask {
}});
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ omConfiguration);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer = reconContainerMetadataManager
@@ -380,7 +386,8 @@ public class TestContainerKeyMapperTask {
// Reprocess container key mappings
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(reconContainerMetadataManager);
+ new ContainerKeyMapperTask(reconContainerMetadataManager,
+ omConfiguration);
String bucket = BUCKET_NAME;
String volume = VOLUME_NAME;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]