sumitagrawl commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2579853616
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -142,11 +157,76 @@ public TaskResult reprocess(OMMetadataManager
omMetadataManager) {
if (!replicatedSizeMap.isEmpty()) {
writeDataToDB(replicatedSizeMap);
}
+ long endTime = Time.monotonicNow();
+ long durationMs = endTime - startTime;
- LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
+ LOG.info("{}: Reprocess completed in {} ms", getTaskName(), durationMs);
return buildTaskResult(true);
}
+ /**
+ * Check if table uses non-String keys (e.g., OzoneTokenIdentifier).
+ * These tables cannot use StringCodec and must be processed sequentially.
+ */
+ private boolean usesNonStringKeys(String tableName) {
+ return tableName.equals("dTokenTable") ||
tableName.equals("s3SecretTable");
+ }
+
+ /**
+ * Process table sequentially using raw iterator (no type assumptions).
+ * Used for tables with non-String keys or as fallback.
+ */
+ private void processTableSequentially(String tableName, Table<?, ?> table)
throws IOException {
+ LOG.info("{}: Processing table {} sequentially (non-String keys)",
getTaskName(), tableName);
+
+ Table<String, ?> stringTable = (Table<String, ?>) table;
+ try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator =
stringTable.iterator()) {
+ long count = Iterators.size(iterator);
+ objectCountMap.put(getTableCountKeyFromTable(tableName), count);
+ }
+ }
+
+ /**
+ * Process table in parallel using multiple iterators and workers.
+ * Only for tables with String keys.
+ */
+ private void processTableInParallel(String tableName, Table<?, ?> table,
Review Comment:
no need pass table here, get table with StringCodec as done in try-catch and
use same if required other place
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -142,11 +157,76 @@ public TaskResult reprocess(OMMetadataManager
omMetadataManager) {
if (!replicatedSizeMap.isEmpty()) {
writeDataToDB(replicatedSizeMap);
}
+ long endTime = Time.monotonicNow();
+ long durationMs = endTime - startTime;
- LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
+ LOG.info("{}: Reprocess completed in {} ms", getTaskName(), durationMs);
return buildTaskResult(true);
}
+ /**
+ * Check if table uses non-String keys (e.g., OzoneTokenIdentifier).
+ * These tables cannot use StringCodec and must be processed sequentially.
+ */
+ private boolean usesNonStringKeys(String tableName) {
+ return tableName.equals("dTokenTable") ||
tableName.equals("s3SecretTable");
+ }
+
+ /**
+ * Process table sequentially using raw iterator (no type assumptions).
+ * Used for tables with non-String keys or as fallback.
+ */
+ private void processTableSequentially(String tableName, Table<?, ?> table)
throws IOException {
Review Comment:
we can get table inside only with keyIterator(), no need value iterator.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Object flushLock = new Object();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> myLocalMap =
allLocalMaps.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new ConcurrentHashMap<>());
+
+ handleKeyReprocess(kv.getKey(), kv.getValue(), myLocalMap,
SHARED_CONTAINER_KEY_COUNT_MAP,
reconContainerMetadataManager);
- omKeyCount++;
- // Check and flush data if it reaches the batch threshold
- if (!checkAndCallFlushToDB(containerKeyMap,
containerKeyFlushToDBMaxThreshold,
+ // Flush this worker's map when it reaches threshold
+ if (myLocalMap.size() >= PER_WORKER_THRESHOLD) {
+ synchronized (flushLock) {
Review Comment:
we do not need lock as containerKeyPrefix is unique, added to table.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Object flushLock = new Object();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> myLocalMap =
allLocalMaps.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new ConcurrentHashMap<>());
+
+ handleKeyReprocess(kv.getKey(), kv.getValue(), myLocalMap,
SHARED_CONTAINER_KEY_COUNT_MAP,
reconContainerMetadataManager);
- omKeyCount++;
- // Check and flush data if it reaches the batch threshold
- if (!checkAndCallFlushToDB(containerKeyMap,
containerKeyFlushToDBMaxThreshold,
+ // Flush this worker's map when it reaches threshold
+ if (myLocalMap.size() >= PER_WORKER_THRESHOLD) {
+ synchronized (flushLock) {
+ if (!flushAndCommitContainerKeyInfoToDB(myLocalMap,
Collections.emptyMap(),
+ reconContainerMetadataManager)) {
+ throw new UncheckedIOException(new IOException("Unable to
flush containerKey information to the DB"));
+ }
+ }
+ }
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+
+ try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
+ new ParallelTableIteratorOperation<>(omMetadataManager,
omKeyInfoTable,
+ StringCodec.get(), maxIterators, maxWorkers,
maxKeysInMemory, PER_WORKER_THRESHOLD)) {
+ keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
+ }
+
+ // Final flush: Write remaining entries from all worker local maps to DB
+ for (Map<ContainerKeyPrefix, Integer> workerLocalMap :
allLocalMaps.values()) {
+ if (!workerLocalMap.isEmpty()) {
+ if (!flushAndCommitContainerKeyInfoToDB(workerLocalMap,
Collections.emptyMap(),
reconContainerMetadataManager)) {
- LOG.error("Failed to flush container key data for {}", taskName);
+ LOG.error("Failed to flush worker local map for {}", taskName);
return false;
}
}
}
- // Final flush and commit
- if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
containerKeyCountMap, reconContainerMetadataManager)) {
- LOG.error("Failed to flush Container Key data to DB for {}", taskName);
+ // Capture total container count from shared map
+ long totalContainers = SHARED_CONTAINER_KEY_COUNT_MAP.size();
+
+ // Final flush: Shared container count map
+ if (!flushAndCommitContainerKeyInfoToDB(Collections.emptyMap(),
SHARED_CONTAINER_KEY_COUNT_MAP, reconContainerMetadataManager)) {
+ LOG.error("Failed to flush shared container count map for {}",
taskName);
return false;
}
+ // Write total container count once at the end (after all processing)
+ if (totalContainers > 0) {
+
reconContainerMetadataManager.incrementContainerCountBy(totalContainers);
+ }
+
+ // Decrement active task counter and cleanup if this is the last task
+ int remainingTasks = ACTIVE_TASK_COUNT.decrementAndGet();
Review Comment:
perfrom this in INITIALIZATION_LOCK, decrement and validate if count is "0",
if "0", perform update
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -142,11 +157,76 @@ public TaskResult reprocess(OMMetadataManager
omMetadataManager) {
if (!replicatedSizeMap.isEmpty()) {
writeDataToDB(replicatedSizeMap);
}
+ long endTime = Time.monotonicNow();
+ long durationMs = endTime - startTime;
- LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
+ LOG.info("{}: Reprocess completed in {} ms", getTaskName(), durationMs);
return buildTaskResult(true);
}
+ /**
+ * Check if table uses non-String keys (e.g., OzoneTokenIdentifier).
+ * These tables cannot use StringCodec and must be processed sequentially.
+ */
+ private boolean usesNonStringKeys(String tableName) {
+ return tableName.equals("dTokenTable") ||
tableName.equals("s3SecretTable");
+ }
+
+ /**
+ * Process table sequentially using raw iterator (no type assumptions).
+ * Used for tables with non-String keys or as fallback.
+ */
+ private void processTableSequentially(String tableName, Table<?, ?> table)
throws IOException {
+ LOG.info("{}: Processing table {} sequentially (non-String keys)",
getTaskName(), tableName);
+
+ Table<String, ?> stringTable = (Table<String, ?>) table;
+ try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator =
stringTable.iterator()) {
+ long count = Iterators.size(iterator);
+ objectCountMap.put(getTableCountKeyFromTable(tableName), count);
+ }
+ }
+
+ /**
+ * Process table in parallel using multiple iterators and workers.
+ * Only for tables with String keys.
+ */
+ private void processTableInParallel(String tableName, Table<?, ?> table,
+ OMMetadataManager omMetadataManager)
throws Exception {
+ int workerCount = 2; // Only 2 workers needed for simple counting
+ long loggingThreshold = calculateLoggingThreshold(table);
+
+ AtomicLong count = new AtomicLong(0);
+
+ try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new
ParallelTableIteratorOperation<>(
+ omMetadataManager, omMetadataManager.getStore()
+ .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(),
TableCache.CacheType.NO_CACHE), StringCodec.get(),
+ maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {
+
+ parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
+ if (kv != null) {
+ count.incrementAndGet();
+ }
+ return null;
+ });
+ }
+
+ objectCountMap.put(getTableCountKeyFromTable(tableName), count.get());
+ }
+
+ /**
+ * Calculate logging threshold based on table size.
+ * Logs progress every 1% of total keys, minimum 1.
+ */
+ private long calculateLoggingThreshold(Table<?, ?> table) {
Review Comment:
can pass estimatedCount directly.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Object flushLock = new Object();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> myLocalMap =
allLocalMaps.computeIfAbsent(
Review Comment:
rename myLocalMap to containerKeyPrefixMap
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -105,60 +105,96 @@ public static void
truncateFileCountTableIfNeeded(ReconFileMetadataManager recon
public static ReconOmTask.TaskResult reprocess(OMMetadataManager
omMetadataManager,
ReconFileMetadataManager
reconFileMetadataManager,
BucketLayout bucketLayout,
- String taskName) {
- LOG.info("Starting RocksDB Reprocess for {}", taskName);
- Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
- long startTime = Time.monotonicNow();
+ String taskName,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory,
+ long
fileSizeCountFlushThreshold) {
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new ConcurrentHashMap<>();
+ long overallStartTime = Time.monotonicNow();
// Ensure the file count table is truncated only once during reprocess
truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName);
boolean status = reprocessBucketLayout(
- bucketLayout, omMetadataManager, fileSizeCountMap,
reconFileMetadataManager, taskName);
+ bucketLayout, omMetadataManager, fileSizeCountMap,
reconFileMetadataManager, taskName,
+ maxIterators, maxWorkers, maxKeysInMemory,
fileSizeCountFlushThreshold);
if (!status) {
return buildTaskResult(taskName, false);
}
+ // Write remaining counts to DB (no global lock needed - FSO and OBS are
mutually exclusive)
writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
-
- long endTime = Time.monotonicNow();
- LOG.info("{} completed RocksDB Reprocess in {} ms.", taskName, (endTime -
startTime));
+
+ long totalDurationMs = Time.monotonicNow() - overallStartTime;
+ double durationSeconds = (double) totalDurationMs / 1000.0;
+
+ LOG.info("{}: Reprocess completed in {} sec", taskName, durationSeconds);
return buildTaskResult(taskName, true);
}
/**
- * Iterates over the OM DB keys for the given bucket layout and updates the
fileSizeCountMap (RocksDB version).
+ * Iterates over the OM DB keys for the given bucket layout using lockless
per-worker maps.
+ * Each worker maintains its own map to eliminate read lock contention.
*/
public static boolean reprocessBucketLayout(BucketLayout bucketLayout,
OMMetadataManager
omMetadataManager,
Map<FileSizeCountKey, Long>
fileSizeCountMap,
ReconFileMetadataManager
reconFileMetadataManager,
- String taskName) {
+ String taskName,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory,
+ long
fileSizeCountFlushThreshold) {
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- int totalKeysProcessed = 0;
-
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter =
- omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
- totalKeysProcessed++;
- // Flush to RocksDB periodically.
- if (fileSizeCountMap.size() >= 100000) {
- // For reprocess, we don't need to check existing values since table
was truncated
- LOG.debug("Flushing {} accumulated counts to RocksDB for {}",
fileSizeCountMap.size(), taskName);
- writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
- fileSizeCountMap.clear();
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1, fileSizeCountFlushThreshold
/ maxWorkers);
+
+ // Map thread IDs to worker-specific maps for lockless updates
+ Map<Long, Map<FileSizeCountKey, Long>> allMap = new ConcurrentHashMap<>();
+
+ // Lock for coordinating DB flush operations only
+ Object flushLock = new Object();
+
+ // Lambda executed by workers for each key
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ // Get or create this worker's private map using thread ID
+ Map<FileSizeCountKey, Long> myMap = allMap.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new HashMap<>());
+
+ // Update worker's private map without locks
+ handlePutKeyEvent(kv.getValue(), myMap);
+
+ // Flush this worker's map when it reaches threshold
+ if (myMap.size() >= PER_WORKER_THRESHOLD) {
+ synchronized (flushLock) {
+ writeCountsToDB(myMap, reconFileMetadataManager);
+ myMap.clear();
Review Comment:
rename myMap variable as fileSizeCountMap
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -51,31 +59,63 @@ public abstract class ContainerKeyMapperHelper {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerKeyMapperHelper.class);
- // Static lock to guard table truncation.
- private static final Object TRUNCATE_LOCK = new Object();
+ // Single lock to guard all initialization operations (table truncation +
map clearing)
+ private static final Object INITIALIZATION_LOCK = new Object();
/**
- * Ensures that the container key tables are truncated only once before
reprocessing.
- * Uses an AtomicBoolean to track if truncation has already been performed.
- *
- * @param reconContainerMetadataManager The metadata manager instance
responsible for DB operations.
+ * Reference counter to track how many tasks are actively using the shared
map.
+ * Initialized to 2 (FSO + OBS tasks) during initialization.
+ * Each task decrements on completion. Last task (count reaches 0) clears
the shared map.
+ */
+ private static final AtomicInteger ACTIVE_TASK_COUNT = new AtomicInteger(0);
+
+ /**
+ * SHARED across all tasks (FSO + OBS) for cross-task synchronization.
+ * Maps: ContainerId -> AtomicLong (key count in that container)
+ * Purpose: Prevents data corruption when FSO and OBS tasks run concurrently
+ * and both write to the same container IDs. Both tasks accumulate into this
+ * single shared map, ensuring final DB write contains complete totals.
*/
- public static void truncateTablesIfNeeded(ReconContainerMetadataManager
reconContainerMetadataManager,
- String taskName) {
- synchronized (TRUNCATE_LOCK) {
- if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false,
true)) {
+ private static final Map<Long, AtomicLong> SHARED_CONTAINER_KEY_COUNT_MAP =
new ConcurrentHashMap<>();
+
+ /**
+ * Performs one-time initialization for Container Key Mapper tasks.
+ * This includes:
+ * 1. Truncating container key tables in DB
+ * 2. Clearing the shared container count map
+ *
+ * This method is called by both FSO and OBS tasks at the start of reprocess.
+ * Only the first task to call this will perform initialization.
+ *
+ * @param reconContainerMetadataManager The metadata manager for DB
operations
+ * @param taskName Name of the task calling this method (for logging)
+ * @throws RuntimeException if initialization fails
+ */
+ private static void initializeContainerKeyMapperIfNeeded(
+ ReconContainerMetadataManager reconContainerMetadataManager,
+ String taskName) {
+
+ synchronized (INITIALIZATION_LOCK) {
+ // Check if already initialized by another task
+ if (ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.compareAndSet(false,
true)) {
try {
- // Perform table truncation
+ // Step 1: Truncate tables
reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap());
- LOG.debug("Successfully truncated container key tables.");
+
+ // Step 2: Clear shared map
+ SHARED_CONTAINER_KEY_COUNT_MAP.clear();
+
+ // Step 3: Initialize reference counter (2 tasks: FSO + OBS)
+ ACTIVE_TASK_COUNT.set(2);
Review Comment:
need increment in synchronized_lock by one every task, and decrement in same
lock when finish
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +124,108 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Object flushLock = new Object();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> myLocalMap =
allLocalMaps.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new ConcurrentHashMap<>());
+
+ handleKeyReprocess(kv.getKey(), kv.getValue(), myLocalMap,
SHARED_CONTAINER_KEY_COUNT_MAP,
reconContainerMetadataManager);
- omKeyCount++;
- // Check and flush data if it reaches the batch threshold
- if (!checkAndCallFlushToDB(containerKeyMap,
containerKeyFlushToDBMaxThreshold,
+ // Flush this worker's map when it reaches threshold
+ if (myLocalMap.size() >= PER_WORKER_THRESHOLD) {
+ synchronized (flushLock) {
+ if (!flushAndCommitContainerKeyInfoToDB(myLocalMap,
Collections.emptyMap(),
+ reconContainerMetadataManager)) {
+ throw new UncheckedIOException(new IOException("Unable to
flush containerKey information to the DB"));
+ }
+ }
+ }
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+
+ try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
+ new ParallelTableIteratorOperation<>(omMetadataManager,
omKeyInfoTable,
+ StringCodec.get(), maxIterators, maxWorkers,
maxKeysInMemory, PER_WORKER_THRESHOLD)) {
+ keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
+ }
+
+ // Final flush: Write remaining entries from all worker local maps to DB
+ for (Map<ContainerKeyPrefix, Integer> workerLocalMap :
allLocalMaps.values()) {
+ if (!workerLocalMap.isEmpty()) {
+ if (!flushAndCommitContainerKeyInfoToDB(workerLocalMap,
Collections.emptyMap(),
reconContainerMetadataManager)) {
- LOG.error("Failed to flush container key data for {}", taskName);
+ LOG.error("Failed to flush worker local map for {}", taskName);
return false;
}
}
}
- // Final flush and commit
- if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
containerKeyCountMap, reconContainerMetadataManager)) {
- LOG.error("Failed to flush Container Key data to DB for {}", taskName);
+ // Capture total container count from shared map
+ long totalContainers = SHARED_CONTAINER_KEY_COUNT_MAP.size();
+
+ // Final flush: Shared container count map
+ if (!flushAndCommitContainerKeyInfoToDB(Collections.emptyMap(),
SHARED_CONTAINER_KEY_COUNT_MAP, reconContainerMetadataManager)) {
Review Comment:
flush to be done at last, else can have twice flush.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -95,39 +109,40 @@ public void init() {
}
/**
- * Iterates the rows of each table in the OM snapshot DB and calculates the
- * counts and sizes for table data.
- * <p>
- * For tables that require data size calculation
- * (as returned by getTablesToCalculateSize), both the number of
- * records (count) and total data size of the records are calculated.
- * For all other tables, only the count of records is calculated.
+ * Reprocess all OM tables to calculate counts and sizes.
+ * Handler tables (with size calculation) use sequential iteration.
+ * Simple tables (count only) use parallel iteration with String keys,
+ * or sequential for non-String key tables.
*
- * @param omMetadataManager OM Metadata instance.
- * @return Pair
+ * @param omMetadataManager OM Metadata instance
+ * @return TaskResult indicating success or failure
*/
@Override
public TaskResult reprocess(OMMetadataManager omMetadataManager) {
+ LOG.info("{}: Starting reprocess", getTaskName());
+ long startTime = Time.monotonicNow();
+
init();
for (String tableName : tables) {
- Table table = omMetadataManager.getTable(tableName);
+ Table<?, ?> table = omMetadataManager.getTable(tableName);
- try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator
- = table.iterator()) {
+ try {
if (tableHandlers.containsKey(tableName)) {
- Triple<Long, Long, Long> details =
- tableHandlers.get(tableName).getTableSizeAndCount(iterator);
- objectCountMap.put(getTableCountKeyFromTable(tableName),
- details.getLeft());
- unReplicatedSizeMap.put(
- getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle());
- replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName),
- details.getRight());
+ Table<String, ?> stringTable = (Table<String, ?>) table;
+ try (TableIterator<String, ? extends Table.KeyValue<String, ?>>
iterator = stringTable.iterator()) {
Review Comment:
we can remove table object and getTableSizeAndCount can get table object and
return count
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]