ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2526049030
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -99,21 +110,38 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
// Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
- reconContainerMetadataManager);
- omKeyCount++;
-
- // Check and flush data if it reaches the batch threshold
- if (!checkAndCallFlushToDB(containerKeyMap,
containerKeyFlushToDBMaxThreshold,
- reconContainerMetadataManager)) {
- LOG.error("Failed to flush container key data for {}", taskName);
- return false;
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ // Use parallel table iteration
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ try {
+ lock.readLock().lock();
+ handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ reconContainerMetadataManager);
+ } finally {
+ lock.readLock().unlock();
+ }
+ omKeyCount.incrementAndGet();
+ if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) {
+ try {
+ lock.writeLock().lock();
Review Comment:
1. The review comment raised a concern about this line present in the method
`handleKeyReprocess` within `ContainerKeyMapperHelper` class.
- `AtomicLong count =
sharedContainerKeyCountMap.computeIfAbsent(containerId, k -> new
AtomicLong(0));`
2. When two threads call `computeIfAbsent` with the same `containerId`,
`ConcurrentHashMap` ensures the initializer runs at most once. Both threads
receive a reference to the same `AtomicLong`. If the container is new, its
initial value is `0`.
3. On `long newCount = count.incrementAndGet();`, increments are atomic. One
thread updates `0 → 1`; the other observes the new value and performs `1 → 2`.
This uses lock-free CAS under the hood—no lost updates, no explicit locking.
4. This design prevents race conditions while preserving parallelism and
correctness.
Below is a short idea of how two threads will work together on the code and
how the current design prevents race condition
```
Initial State:
sharedContainerKeyCountMap: {} (Container-999 doesn't exist)
═══════════════════════════════════════════════════════════
Thread 1 (FSO): Thread 2 (OBS):
Processing key1 with Processing key2 with
block in Container-999 block in Container-999
Both call computeIfAbsent(999) at the EXACT SAME TIME
↓ ↓
═══════════════════════════════════════════════════════════
Inside ConcurrentHashMap (Internal Locking):
Thread 1: Thread 2:
├─ Acquires segment lock ├─ BLOCKED (waiting for lock)
│ for hash(999) │
├─ Checks: Does 999 exist? NO │
├─ Executes lambda: │
│ new AtomicLong(0) │
│ → Creates AtomicLong@0x1234 │
├─ Stores in map: │
│ 999 → AtomicLong@0x1234 │
├─ Releases lock │
└─ Returns: AtomicLong@0x1234 ├─ Acquires lock (now free)
├─ Checks: Does 999 exist? YES!
├─ Retrieves from map:
│ 999 → AtomicLong@0x1234
├─ Does NOT execute lambda
├─ Releases lock
└─ Returns: AtomicLong@0x1234
═══════════════════════════════════════════════════════════
Result:
Thread 1: count = AtomicLong@0x1234 ┐
Thread 2: count = AtomicLong@0x1234 ├─ SAME OBJECT!
┘
Thread 1: Thread 2:
count = AtomicLong@0x1234 count = AtomicLong@0x1234 (SAME!)
↓ ↓
newCount = count.incrementAndGet() newCount = count.incrementAndGet()
↓ ↓
0 → 1 1 → 2
↓ ↓
newCount = 1 newCount = 2
↓ ↓
if (1 == 1)? YES if (2 == 1)? NO
↓ ↓
containerCountToIncrement++ (skip)
↓
Final: Final:
- Key count: 1 - Key count: 2
- New container: 1 - New container: 0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]