adoroszlai commented on code in PR #7796:
URL: https://github.com/apache/ozone/pull/7796#discussion_r1950371454


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java:
##########
@@ -76,49 +85,85 @@ public FileSizeCountTask(FileCountBySizeDao 
fileCountBySizeDao,
    */
   @Override
   public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
-    // Map to store the count of files based on file size
-    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
+    LOG.info("Starting reprocess of FileSizeCountTask...");
+    long startTime = System.currentTimeMillis();
 
-    // Delete all records from FILE_COUNT_BY_SIZE table
+    // Truncate table first
     int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
-    LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
-
-    // Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout
-    boolean statusFSO =
-        reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED,
-            omMetadataManager,
-            fileSizeCountMap);
-    // Call reprocessBucket method for LEGACY bucket layout
-    boolean statusOBS =
-        reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager,
-            fileSizeCountMap);
-    if (!statusFSO && !statusOBS) {
-      return new ImmutablePair<>(getTaskName(), false);
+    LOG.debug("Cleared {} existing records from {}", execute, 
FILE_COUNT_BY_SIZE);
+
+    List<Future<Boolean>> futures = Arrays.asList(
+        submitReprocessTask("FSO", BucketLayout.FILE_SYSTEM_OPTIMIZED, 
omMetadataManager),
+        submitReprocessTask("LEGACY", BucketLayout.LEGACY, omMetadataManager)
+    );
+
+    boolean allSuccess = true;
+    try {
+      for (Future<Boolean> future : futures) {
+        if (!future.get()) {
+          allSuccess = false;
+        }
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Parallel processing failed: ", e);
+      allSuccess = false;
+      Thread.currentThread().interrupt();
     }
-    writeCountsToDB(fileSizeCountMap);
-    LOG.debug("Completed a 'reprocess' run of FileSizeCountTask.");
-    return new ImmutablePair<>(getTaskName(), true);
+
+    // Write any remaining entries to the database
+    if (!sharedFileSizeCountMap.isEmpty()) {
+      writeCountsToDB(sharedFileSizeCountMap);
+      sharedFileSizeCountMap.clear();
+    }
+
+    LOG.info("Reprocess completed. Success: {}. Time taken: {} ms",
+        allSuccess, (System.currentTimeMillis() - startTime));
+    return new ImmutablePair<>(getTaskName(), allSuccess);
+  }
+
+  /**
+   * Submits a reprocess task with proper thread naming.
+   */
+  private Future<Boolean> submitReprocessTask(String bucketType, BucketLayout 
layout,
+                                              OMMetadataManager 
omMetadataManager) {
+    return executorService.submit(() -> {
+      Thread currentThread = Thread.currentThread();
+      String originalName = currentThread.getName();
+      try {
+        currentThread.setName("FileSizeCountTask-" + bucketType + "-" + 
originalName);
+        return reprocessBucketLayout(layout, omMetadataManager);
+      } finally {
+        currentThread.setName(originalName); // Restore original name after 
execution
+      }
+    });
   }
 
-  private boolean reprocessBucketLayout(BucketLayout bucketLayout,
-                               OMMetadataManager omMetadataManager,
-                               Map<FileSizeCountKey, Long> fileSizeCountMap) {
-    Table<String, OmKeyInfo> omKeyInfoTable =
-        omMetadataManager.getKeyTable(bucketLayout);
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
-             keyIter = omKeyInfoTable.iterator()) {
-      while (keyIter.hasNext()) {
-        Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-        handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
-        //  The time complexity of .size() method is constant time, O(1)
-        if (fileSizeCountMap.size() >= 100000) {
-          writeCountsToDB(fileSizeCountMap);
-          fileSizeCountMap.clear();
+  private Boolean reprocessBucketLayout(BucketLayout bucketLayout,
+                                      OMMetadataManager omMetadataManager) {
+    long keysProcessed = 0;
+
+    try {
+      Table<String, OmKeyInfo> omKeyInfoTable =
+          omMetadataManager.getKeyTable(bucketLayout);
+      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+               keyIter = omKeyInfoTable.iterator()) {
+        while (keyIter.hasNext()) {
+          Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
+          FileSizeCountKey key = getFileSizeCountKey(kv.getValue());
+
+          // Atomically update the count in the shared map
+          sharedFileSizeCountMap.merge(key, 1L, Long::sum);
+          keysProcessed++;
+
+          // Periodically write to the database to avoid memory overflow
+          if (sharedFileSizeCountMap.size() >= 100_000) {
+            writeCountsToDB(sharedFileSizeCountMap);
+            sharedFileSizeCountMap.clear();
+          }

Review Comment:
   This is not atomic.  The other thread could have updated 
`sharedFileSizeCountMap` between `writeCountsToDB` and `clear`.  Those updates 
would be lost.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java:
##########
@@ -305,6 +353,16 @@ private boolean isFileCountBySizeTableEmpty() {
     return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
   }
 
+  private static class BucketLayoutProcessResult {
+    private final boolean success;
+    private final long keysProcessed;
+
+    BucketLayoutProcessResult(boolean success, long keysProcessed) {
+      this.success = success;
+      this.keysProcessed = keysProcessed;
+    }
+  }

Review Comment:
   This is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to