ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588213678


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager 
omMetadataManager,
                                                 ReconContainerMetadataManager 
reconContainerMetadataManager,
                                                 BucketLayout bucketLayout,
                                                 String taskName,
-                                                long 
containerKeyFlushToDBMaxThreshold) {
-    long omKeyCount = 0;
-    Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
-    Map<Long, Long> containerKeyCountMap = new HashMap<>();
+                                                long 
containerKeyFlushToDBMaxThreshold,
+                                                int maxIterators,
+                                                int maxWorkers,
+                                                int maxKeysInMemory) {
 
     try {
-      LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+      LOG.info("{}: Starting reprocess for bucket layout {}", taskName, 
bucketLayout);
       Instant start = Instant.now();
 
-      // Ensure the tables are truncated only once
-      truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+      // Perform one-time initialization (truncate tables + clear shared map)
+      initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager, 
taskName);
 
-      // Get the appropriate table based on BucketLayout
       Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
 
-      // Iterate through the table and process keys
-      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = omKeyInfoTable.iterator()) {
-        while (keyIter.hasNext()) {
-          Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-          handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, 
containerKeyCountMap,
+      // Divide threshold by worker count so each worker flushes independently
+      final long PER_WORKER_THRESHOLD = Math.max(1, 
containerKeyFlushToDBMaxThreshold / maxWorkers);
+      
+      // Map thread IDs to worker-specific local maps for lockless updates
+      Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new 
ConcurrentHashMap<>();
+      
+      Object flushLock = new Object();
+      
+      Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+        try {
+          // Get or create this worker's private local map using thread ID
+          Map<ContainerKeyPrefix, Integer> myLocalMap = 
allLocalMaps.computeIfAbsent(
+              Thread.currentThread().getId(), k -> new ConcurrentHashMap<>());
+          
+          handleKeyReprocess(kv.getKey(), kv.getValue(), myLocalMap, 
SHARED_CONTAINER_KEY_COUNT_MAP,
               reconContainerMetadataManager);
-          omKeyCount++;
 
-          // Check and flush data if it reaches the batch threshold
-          if (!checkAndCallFlushToDB(containerKeyMap, 
containerKeyFlushToDBMaxThreshold,
+          // Flush this worker's map when it reaches threshold
+          if (myLocalMap.size() >= PER_WORKER_THRESHOLD) {
+            synchronized (flushLock) {
+              if (!flushAndCommitContainerKeyInfoToDB(myLocalMap, 
Collections.emptyMap(),
+                  reconContainerMetadataManager)) {
+                throw new UncheckedIOException(new IOException("Unable to 
flush containerKey information to the DB"));
+              }
+            }
+          }
+          return null;
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      };
+      
+      try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
+               new ParallelTableIteratorOperation<>(omMetadataManager, 
omKeyInfoTable,
+                   StringCodec.get(), maxIterators, maxWorkers, 
maxKeysInMemory, PER_WORKER_THRESHOLD)) {
+        keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
+      }
+
+      // Final flush: Write remaining entries from all worker local maps to DB
+      for (Map<ContainerKeyPrefix, Integer> workerLocalMap : 
allLocalMaps.values()) {
+        if (!workerLocalMap.isEmpty()) {
+          if (!flushAndCommitContainerKeyInfoToDB(workerLocalMap, 
Collections.emptyMap(),
               reconContainerMetadataManager)) {
-            LOG.error("Failed to flush container key data for {}", taskName);
+            LOG.error("Failed to flush worker local map for {}", taskName);
             return false;
           }
         }
       }
 
-      // Final flush and commit
-      if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, 
containerKeyCountMap, reconContainerMetadataManager)) {
-        LOG.error("Failed to flush Container Key data to DB for {}", taskName);
+      // Capture total container count from shared map
+      long totalContainers = SHARED_CONTAINER_KEY_COUNT_MAP.size();
+
+      // Final flush: Shared container count map
+      if (!flushAndCommitContainerKeyInfoToDB(Collections.emptyMap(), 
SHARED_CONTAINER_KEY_COUNT_MAP, reconContainerMetadataManager)) {
+        LOG.error("Failed to flush shared container count map for {}", 
taskName);
         return false;
       }
 
+      // Write total container count once at the end (after all processing)
+      if (totalContainers > 0) {
+        
reconContainerMetadataManager.incrementContainerCountBy(totalContainers);
+      }
+
+      // Decrement active task counter and cleanup if this is the last task
+      int remainingTasks = ACTIVE_TASK_COUNT.decrementAndGet();

Review Comment:
   The ACTIVE_TASK_COUNT.decrementAndGet() operation is already atomic and 
thread-safe. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to