sumitagrawl commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2549588843


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java:
##########
@@ -95,6 +95,8 @@ public final class ReconConstants {
 
   public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new 
AtomicBoolean(false);
 
+  public static final AtomicBoolean CONTAINER_KEY_COUNT_MAP_INITIALIZED = new 
AtomicBoolean(false);

Review Comment:
   no need this Atomic boolean.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -80,66 +100,139 @@ public static void 
truncateTablesIfNeeded(ReconContainerMetadataManager reconCon
     }
   }
 
+  /**
+   * Ensures the shared container count map is cleared once per reprocess 
cycle.
+   * This must be called by the first task that starts reprocessing to prevent
+   * cross-task data corruption where FSO and OBS tasks overwrite each other's 
counts.
+   */
+  private static void initializeSharedContainerCountMapIfNeeded(String 
taskName) {
+    synchronized (SHARED_MAP_LOCK) {

Review Comment:
   init of map and clear can be done using truncate_lock, rename this lock.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -390,66 +487,51 @@ private static void writeToTheDB(Map<ContainerKeyPrefix, 
Integer> containerKeyMa
    *
    * @param key key String
    * @param omKeyInfo omKeyInfo value
-   * @param containerKeyMap we keep the added containerKeys in this map
-   *                        to allow incremental batching to containerKeyTable
-   * @param containerKeyCountMap we keep the containerKey counts in this map
-   *                             to allow batching to containerKeyCountTable
-   *                             after reprocessing is done
+   * @param localContainerKeyMap Local per-task map for ContainerKeyPrefix 
mappings
+   *                             (cleared on flush, not shared between tasks)
+   * @param sharedContainerKeyCountMap Shared cross-task map for container 
counts
+   *                                   (FSO + OBS both update this, uses 
AtomicLong for thread safety)
    * @param reconContainerMetadataManager Recon metadata manager instance
    * @throws IOException if unable to write to recon DB.
    */
   public static void handleKeyReprocess(String key,
                                         OmKeyInfo omKeyInfo,
-                                        Map<ContainerKeyPrefix, Integer> 
containerKeyMap,
-                                        Map<Long, Long> containerKeyCountMap,
+                                        Map<ContainerKeyPrefix, Integer> 
localContainerKeyMap,
+                                        Map<Long, AtomicLong> 
sharedContainerKeyCountMap,
                                         ReconContainerMetadataManager 
reconContainerMetadataManager)
       throws IOException {
 
-    long containerCountToIncrement = 0;
-
     for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : 
omKeyInfo.getKeyLocationVersions()) {
       long keyVersion = omKeyLocationInfoGroup.getVersion();
       for (OmKeyLocationInfo omKeyLocationInfo : 
omKeyLocationInfoGroup.getLocationList()) {
         long containerId = omKeyLocationInfo.getContainerID();
         ContainerKeyPrefix containerKeyPrefix = 
ContainerKeyPrefix.get(containerId, key, keyVersion);
 
-        if 
(reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix)
 == 0
-            && !containerKeyMap.containsKey(containerKeyPrefix)) {
+        // During reprocess, tables are empty so skip DB lookup - just check 
in-memory map
+        if (!localContainerKeyMap.containsKey(containerKeyPrefix)) {
           // Save on writes. No need to save same container-key prefix mapping 
again.
-          containerKeyMap.put(containerKeyPrefix, 1);
-
-          // Check if container already exists; if not, increment the count
-          if (!reconContainerMetadataManager.doesContainerExists(containerId)
-              && !containerKeyCountMap.containsKey(containerId)) {
-            containerCountToIncrement++;
-          }
-
-          // Update the count of keys for the given containerID
-          long keyCount = containerKeyCountMap.getOrDefault(containerId,
-              
reconContainerMetadataManager.getKeyCountForContainer(containerId));
+          localContainerKeyMap.put(containerKeyPrefix, 1);
 
-          containerKeyCountMap.put(containerId, keyCount + 1);
+          // Thread-safe increment using computeIfAbsent (cross-task safe: FSO 
+ OBS)
+          sharedContainerKeyCountMap.computeIfAbsent(containerId, k -> new 
AtomicLong(0))
+              .incrementAndGet();
         }
       }
     }
-
-    if (containerCountToIncrement > 0) {
-      
reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement);
-    }
+    // Container count will be written once at the end of reprocess, not here 
(Derby optimization)
   }
 
   public static boolean flushAndCommitContainerKeyInfoToDB(
-      Map<ContainerKeyPrefix, Integer> containerKeyMap,
-      Map<Long, Long> containerKeyCountMap,
+      Map<ContainerKeyPrefix, Integer> localContainerKeyMap,
+      Map<Long, AtomicLong> sharedContainerKeyCountMap,
       ReconContainerMetadataManager reconContainerMetadataManager) {
 
     try {
       // No deleted container list needed since "reprocess" only has put 
operations
-      writeToTheDB(containerKeyMap, containerKeyCountMap, 
Collections.emptyList(), reconContainerMetadataManager);
+      writeToTheDB(localContainerKeyMap, sharedContainerKeyCountMap, 
Collections.emptyList(), reconContainerMetadataManager);

Review Comment:
   only last task getting executed need flush and cleanup map, else data can 
got wrong.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -105,24 +120,45 @@ public static void 
truncateFileCountTableIfNeeded(ReconFileMetadataManager recon
   public static ReconOmTask.TaskResult reprocess(OMMetadataManager 
omMetadataManager,
                                                  ReconFileMetadataManager 
reconFileMetadataManager,
                                                  BucketLayout bucketLayout,
-                                                 String taskName) {
-    LOG.info("Starting RocksDB Reprocess for {}", taskName);
-    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
-    long startTime = Time.monotonicNow();
+                                                 String taskName,
+                                                 int maxIterators,
+                                                 int maxWorkers,
+                                                 int maxKeysInMemory) {
+    LOG.info("{}: Starting parallel RocksDB reprocess with {} iterators, {} 
workers for bucket layout {}",
+        taskName, maxIterators, maxWorkers, bucketLayout);
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new ConcurrentHashMap<>();
+    long overallStartTime = Time.monotonicNow();
     
     // Ensure the file count table is truncated only once during reprocess
     truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName);
     
+    long iterationStartTime = Time.monotonicNow();
     boolean status = reprocessBucketLayout(
-        bucketLayout, omMetadataManager, fileSizeCountMap, 
reconFileMetadataManager, taskName);
+        bucketLayout, omMetadataManager, fileSizeCountMap, 
reconFileMetadataManager, taskName,
+        maxIterators, maxWorkers, maxKeysInMemory);
     if (!status) {
       return buildTaskResult(taskName, false);
     }
+    long iterationEndTime = Time.monotonicNow();
     
-    writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
+    long writeStartTime = Time.monotonicNow();
+    // Acquire GLOBAL lock (cross-task) before writing to DB
+    FILE_COUNT_WRITE_LOCK.writeLock().lock();

Review Comment:
   FILE_COUNT_LOCK needs to be local lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to