ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588203608


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -142,11 +157,76 @@ public TaskResult reprocess(OMMetadataManager 
omMetadataManager) {
     if (!replicatedSizeMap.isEmpty()) {
       writeDataToDB(replicatedSizeMap);
     }
+    long endTime = Time.monotonicNow();
+    long durationMs = endTime - startTime;
 
-    LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
+    LOG.info("{}: Reprocess completed in {} ms", getTaskName(), durationMs);
     return buildTaskResult(true);
   }
 
+  /**
+   * Check if table uses non-String keys (e.g., OzoneTokenIdentifier).
+   * These tables cannot use StringCodec and must be processed sequentially.
+   */
+  private boolean usesNonStringKeys(String tableName) {
+    return tableName.equals("dTokenTable") || 
tableName.equals("s3SecretTable");
+  }
+
+  /**
+   * Process table sequentially using raw iterator (no type assumptions).
+   * Used for tables with non-String keys or as fallback.
+   */
+  private void processTableSequentially(String tableName, Table<?, ?> table) 
throws IOException {
+    LOG.info("{}: Processing table {} sequentially (non-String keys)", 
getTaskName(), tableName);
+    
+    Table<String, ?> stringTable = (Table<String, ?>) table;
+    try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator = 
stringTable.iterator()) {
+      long count = Iterators.size(iterator);
+      objectCountMap.put(getTableCountKeyFromTable(tableName), count);
+    }
+  }
+
+  /**
+   * Process table in parallel using multiple iterators and workers.
+   * Only for tables with String keys.
+   */
+  private void processTableInParallel(String tableName, Table<?, ?> table, 
+                                      OMMetadataManager omMetadataManager) 
throws Exception {
+    int workerCount = 2;  // Only 2 workers needed for simple counting
+    long loggingThreshold = calculateLoggingThreshold(table);
+    
+    AtomicLong count = new AtomicLong(0);
+
+    try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new 
ParallelTableIteratorOperation<>(
+        omMetadataManager, omMetadataManager.getStore()
+        .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), 
TableCache.CacheType.NO_CACHE), StringCodec.get(),
+        maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {
+      
+      parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
+        if (kv != null) {
+          count.incrementAndGet();
+        }
+        return null;
+      });
+    }
+    
+    objectCountMap.put(getTableCountKeyFromTable(tableName), count.get());
+  }
+
+  /**
+   * Calculate logging threshold based on table size.
+   * Logs progress every 1% of total keys, minimum 1.
+   */
+  private long calculateLoggingThreshold(Table<?, ?> table) {

Review Comment:
   Implemented. Changed method to take long `estimatedCount` instead of Table.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to