ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588208087
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Object flushLock = new Object();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> myLocalMap =
allLocalMaps.computeIfAbsent(
Review Comment:
Renamed myLocalMap to containerKeyPrefixMap for better clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]