ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588208087


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager 
omMetadataManager,
                                                 ReconContainerMetadataManager 
reconContainerMetadataManager,
                                                 BucketLayout bucketLayout,
                                                 String taskName,
-                                                long 
containerKeyFlushToDBMaxThreshold) {
-    long omKeyCount = 0;
-    Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
-    Map<Long, Long> containerKeyCountMap = new HashMap<>();
+                                                long 
containerKeyFlushToDBMaxThreshold,
+                                                int maxIterators,
+                                                int maxWorkers,
+                                                int maxKeysInMemory) {
 
     try {
-      LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+      LOG.info("{}: Starting reprocess for bucket layout {}", taskName, 
bucketLayout);
       Instant start = Instant.now();
 
-      // Ensure the tables are truncated only once
-      truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+      // Perform one-time initialization (truncate tables + clear shared map)
+      initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager, 
taskName);
 
-      // Get the appropriate table based on BucketLayout
       Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
 
-      // Iterate through the table and process keys
-      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = omKeyInfoTable.iterator()) {
-        while (keyIter.hasNext()) {
-          Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-          handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, 
containerKeyCountMap,
+      // Divide threshold by worker count so each worker flushes independently
+      final long PER_WORKER_THRESHOLD = Math.max(1, 
containerKeyFlushToDBMaxThreshold / maxWorkers);
+      
+      // Map thread IDs to worker-specific local maps for lockless updates
+      Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new 
ConcurrentHashMap<>();
+      
+      Object flushLock = new Object();
+      
+      Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+        try {
+          // Get or create this worker's private local map using thread ID
+          Map<ContainerKeyPrefix, Integer> myLocalMap = 
allLocalMaps.computeIfAbsent(

Review Comment:
   Renamed myLocalMap to containerKeyPrefixMap for better clarity.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to