sumitagrawl commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2597255930
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java:
##########
@@ -112,19 +113,20 @@ public void handleUpdateEvent(OMDBUpdateEvent<String,
Object> event,
* pending deletion in Ozone.
*/
@Override
- public Triple<Long, Long, Long> getTableSizeAndCount(
- TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator)
- throws IOException {
+ public Triple<Long, Long, Long> getTableSizeAndCount(String tableName,
+ OMMetadataManager omMetadataManager) throws IOException {
long count = 0;
long unReplicatedSize = 0;
long replicatedSize = 0;
- if (iterator != null) {
+ @SuppressWarnings("unchecked")
+ Table<String, RepeatedOmKeyInfo> table =
+ (Table<String, RepeatedOmKeyInfo>)
omMetadataManager.getTable(tableName);
Review Comment:
omMetadataManager.getDeletedTable() can be used directly
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -84,66 +123,106 @@ public static boolean reprocess(OMMetadataManager
omMetadataManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
BucketLayout bucketLayout,
String taskName,
- long
containerKeyFlushToDBMaxThreshold) {
- long omKeyCount = 0;
- Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
- Map<Long, Long> containerKeyCountMap = new HashMap<>();
+ long
containerKeyFlushToDBMaxThreshold,
+ int maxIterators,
+ int maxWorkers,
+ int maxKeysInMemory) {
try {
- LOG.debug("Starting a 'reprocess' run for {}.", taskName);
+ LOG.info("{}: Starting reprocess for bucket layout {}", taskName,
bucketLayout);
Instant start = Instant.now();
- // Ensure the tables are truncated only once
- truncateTablesIfNeeded(reconContainerMetadataManager, taskName);
+ // Perform one-time initialization (truncate tables + clear shared map)
+ initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager,
taskName);
- // Get the appropriate table based on BucketLayout
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
- // Iterate through the table and process keys
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap,
containerKeyCountMap,
+ // Divide threshold by worker count so each worker flushes independently
+ final long PER_WORKER_THRESHOLD = Math.max(1,
containerKeyFlushToDBMaxThreshold / maxWorkers);
+
+ // Map thread IDs to worker-specific local maps for lockless updates
+ Map<Long, Map<ContainerKeyPrefix, Integer>> allLocalMaps = new
ConcurrentHashMap<>();
+
+ Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+ try {
+ // Get or create this worker's private local map using thread ID
+ Map<ContainerKeyPrefix, Integer> containerKeyPrefixMap =
allLocalMaps.computeIfAbsent(
+ Thread.currentThread().getId(), k -> new ConcurrentHashMap<>());
+
+ handleKeyReprocess(kv.getKey(), kv.getValue(),
containerKeyPrefixMap, SHARED_CONTAINER_KEY_COUNT_MAP,
reconContainerMetadataManager);
- omKeyCount++;
- // Check and flush data if it reaches the batch threshold
- if (!checkAndCallFlushToDB(containerKeyMap,
containerKeyFlushToDBMaxThreshold,
+ // Flush this worker's map when it reaches threshold
+ if (containerKeyPrefixMap.size() >= PER_WORKER_THRESHOLD) {
+ if (!flushAndCommitContainerKeyInfoToDB(containerKeyPrefixMap,
Collections.emptyMap(),
+ reconContainerMetadataManager)) {
+ throw new UncheckedIOException(new IOException("Unable to flush
containerKey information to the DB"));
+ }
+ }
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+
+ try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
+ new ParallelTableIteratorOperation<>(omMetadataManager,
omKeyInfoTable,
+ StringCodec.get(), maxIterators, maxWorkers,
maxKeysInMemory, PER_WORKER_THRESHOLD)) {
+ keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
Review Comment:
can check impact of throwing UncheckedException, do we need throw that
exception or true/false
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]