ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2526049030


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -99,21 +110,38 @@ public static boolean reprocess(OMMetadataManager 
omMetadataManager,
       // Get the appropriate table based on BucketLayout
       Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
 
-      // Iterate through the table and process keys
-      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = omKeyInfoTable.iterator()) {
-        while (keyIter.hasNext()) {
-          Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-          handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, 
containerKeyCountMap,
-              reconContainerMetadataManager);
-          omKeyCount++;
-
-          // Check and flush data if it reaches the batch threshold
-          if (!checkAndCallFlushToDB(containerKeyMap, 
containerKeyFlushToDBMaxThreshold,
-              reconContainerMetadataManager)) {
-            LOG.error("Failed to flush container key data for {}", taskName);
-            return false;
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      // Use parallel table iteration
+      Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+        try {
+          try {
+            lock.readLock().lock();
+            handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, 
containerKeyCountMap,
+                reconContainerMetadataManager);
+          } finally {
+            lock.readLock().unlock();
+          }
+          omKeyCount.incrementAndGet();
+          if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) {
+            try {
+              lock.writeLock().lock();

Review Comment:
   1. The review comment raised a concern about this line:
       - `AtomicLong count = 
sharedContainerKeyCountMap.computeIfAbsent(containerId, k -> new 
AtomicLong(0));`
   2. When two threads call `computeIfAbsent` with the same `containerId`, 
`ConcurrentHashMap` ensures the initializer runs at most once. Both threads 
receive a reference to the same `AtomicLong`. If the container is new, its 
initial value is `0`.
   3. On `long newCount = count.incrementAndGet();`, increments are atomic. One 
thread updates `0 → 1`; the other observes the new value and performs `1 → 2`. 
This uses lock-free CAS under the hood—no lost updates, no explicit locking.
   4. This design prevents race conditions while preserving parallelism and 
correctness.
   
   Below is a short idea of how two threads will work together on the code and 
how the current design prevents race condition.
   ```
   Initial State:
     sharedContainerKeyCountMap: {} (Container-999 doesn't exist)
   
   ═══════════════════════════════════════════════════════════
   
   Thread 1 (FSO):                    Thread 2 (OBS):
   Processing key1 with                Processing key2 with
   block in Container-999             block in Container-999
   
   Both call computeIfAbsent(999) at the EXACT SAME TIME
           ↓                                  ↓
   ═══════════════════════════════════════════════════════════
   
   Inside ConcurrentHashMap (Internal Locking):
   
   Thread 1:                          Thread 2:
   ├─ Acquires segment lock           ├─ BLOCKED (waiting for lock)
   │  for hash(999)                   │
   ├─ Checks: Does 999 exist? NO      │
   ├─ Executes lambda:                │
   │  new AtomicLong(0)               │
   │  → Creates AtomicLong@0x1234     │
   ├─ Stores in map:                  │
   │  999 → AtomicLong@0x1234         │
   ├─ Releases lock                   │
   └─ Returns: AtomicLong@0x1234      ├─ Acquires lock (now free)
                                      ├─ Checks: Does 999 exist? YES!
                                      ├─ Retrieves from map:
                                      │  999 → AtomicLong@0x1234
                                      ├─ Does NOT execute lambda
                                      ├─ Releases lock
                                      └─ Returns: AtomicLong@0x1234
   
   ═══════════════════════════════════════════════════════════
   
   Result:
   Thread 1: count = AtomicLong@0x1234  ┐
   Thread 2: count = AtomicLong@0x1234  ├─ SAME OBJECT!
                                        ┘
                                        
                                        
   Thread 1:                          Thread 2:
   count = AtomicLong@0x1234          count = AtomicLong@0x1234 (SAME!)
           ↓                                  ↓
   newCount = count.incrementAndGet() newCount = count.incrementAndGet()
              ↓                                ↓
           0 → 1                           1 → 2
              ↓                                ↓
   newCount = 1                          newCount = 2
              ↓                                ↓
   if (1 == 1)? YES                     if (2 == 1)? NO
              ↓                                ↓
   containerCountToIncrement++          (skip)
              ↓                                
   Final:                               Final:
   - Key count: 1                       - Key count: 2
   - New container: 1                   - New container: 0
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to