ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588205253
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -95,39 +109,40 @@ public void init() {
}
/**
- * Iterates the rows of each table in the OM snapshot DB and calculates the
- * counts and sizes for table data.
- * <p>
- * For tables that require data size calculation
- * (as returned by getTablesToCalculateSize), both the number of
- * records (count) and total data size of the records are calculated.
- * For all other tables, only the count of records is calculated.
+ * Reprocess all OM tables to calculate counts and sizes.
+ * Handler tables (with size calculation) use sequential iteration.
+ * Simple tables (count only) use parallel iteration with String keys,
+ * or sequential for non-String key tables.
*
- * @param omMetadataManager OM Metadata instance.
- * @return Pair
+ * @param omMetadataManager OM Metadata instance
+ * @return TaskResult indicating success or failure
*/
@Override
public TaskResult reprocess(OMMetadataManager omMetadataManager) {
+ LOG.info("{}: Starting reprocess", getTaskName());
+ long startTime = Time.monotonicNow();
+
init();
for (String tableName : tables) {
- Table table = omMetadataManager.getTable(tableName);
+ Table<?, ?> table = omMetadataManager.getTable(tableName);
- try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator
- = table.iterator()) {
+ try {
if (tableHandlers.containsKey(tableName)) {
- Triple<Long, Long, Long> details =
- tableHandlers.get(tableName).getTableSizeAndCount(iterator);
- objectCountMap.put(getTableCountKeyFromTable(tableName),
- details.getLeft());
- unReplicatedSizeMap.put(
- getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle());
- replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName),
- details.getRight());
+ Table<String, ?> stringTable = (Table<String, ?>) table;
+ try (TableIterator<String, ? extends Table.KeyValue<String, ?>>
iterator = stringTable.iterator()) {
Review Comment:
Implemented. Modified OmTableHandler interface and both implementations
(OpenKeysInsightHandler, DeletedKeysInsightHandler) to take tableName and
omMetadataManager instead of iterator. Handlers now get table and create
iterators themselves.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -105,60 +105,96 @@ public static void
truncateFileCountTableIfNeeded(ReconFileMetadataManager recon
public static ReconOmTask.TaskResult reprocess(OMMetadataManager
omMetadataManager,
ReconFileMetadataManager
reconFileMetadataManager,
BucketLayout bucketLayout,
- String taskName) {
- LOG.info("Starting RocksDB Reprocess for {}", taskName);
- Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
- long startTime = Time.monotonicNow();
+ String taskName,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory,
+ long
fileSizeCountFlushThreshold) {
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new ConcurrentHashMap<>();
+ long overallStartTime = Time.monotonicNow();
// Ensure the file count table is truncated only once during reprocess
truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName);
boolean status = reprocessBucketLayout(
- bucketLayout, omMetadataManager, fileSizeCountMap,
reconFileMetadataManager, taskName);
+ bucketLayout, omMetadataManager, fileSizeCountMap,
reconFileMetadataManager, taskName,
+ maxIterators, maxWorkers, maxKeysInMemory,
fileSizeCountFlushThreshold);
if (!status) {
return buildTaskResult(taskName, false);
}
+ // Write remaining counts to DB (no global lock needed - FSO and OBS are
mutually exclusive)
writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
-
- long endTime = Time.monotonicNow();
- LOG.info("{} completed RocksDB Reprocess in {} ms.", taskName, (endTime -
startTime));
+
+ long totalDurationMs = Time.monotonicNow() - overallStartTime;
+ double durationSeconds = (double) totalDurationMs / 1000.0;
+
+ LOG.info("{}: Reprocess completed in {} sec", taskName, durationSeconds);
return buildTaskResult(taskName, true);
}
/**
- * Iterates over the OM DB keys for the given bucket layout and updates the
fileSizeCountMap (RocksDB version).
+ * Iterates over the OM DB keys for the given bucket layout using lockless
per-worker maps.
+ * Each worker maintains its own map to eliminate read lock contention.
*/
public static boolean reprocessBucketLayout(BucketLayout bucketLayout,
OMMetadataManager
omMetadataManager,
Map<FileSizeCountKey, Long>
fileSizeCountMap,
ReconFileMetadataManager
reconFileMetadataManager,
- String taskName) {
+ String taskName,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory,
+ long
fileSizeCountFlushThreshold) {
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- int totalKeysProcessed = 0;
-
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter =
- omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
- totalKeysProcessed++;
- // Flush to RocksDB periodically.
- if (fileSizeCountMap.size() >= 100000) {
- // For reprocess, we don't need to check existing values since table
was truncated
- LOG.debug("Flushing {} accumulated counts to RocksDB for {}",
fileSizeCountMap.size(), taskName);
- writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
- fileSizeCountMap.clear();
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1, fileSizeCountFlushThreshold
/ maxWorkers);
+
+ // Map thread IDs to worker-specific maps for lockless updates
+ Map<Long, Map<FileSizeCountKey, Long>> allMap = new ConcurrentHashMap<>();
+
+ // Lock for coordinating DB flush operations only
+ Object flushLock = new Object();
+
+ // Lambda executed by workers for each key
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ // Get or create this worker's private map using thread ID
+ Map<FileSizeCountKey, Long> myMap = allMap.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new HashMap<>());
+
+ // Update worker's private map without locks
+ handlePutKeyEvent(kv.getValue(), myMap);
+
+ // Flush this worker's map when it reaches threshold
+ if (myMap.size() >= PER_WORKER_THRESHOLD) {
+ synchronized (flushLock) {
+ writeCountsToDB(myMap, reconFileMetadataManager);
+ myMap.clear();
Review Comment:
Implemented. Renamed myMap to workerFileSizeCountMap for clarity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]