devmadhuu commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2527099621


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -702,145 +702,153 @@ public boolean syncDataFromOM() {
       try {
         long currentSequenceNumber = getCurrentOMDBSequenceNumber();
         LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber);
-        boolean fullSnapshot = false;
+        boolean fullSnapshot = true;

Review Comment:
   This is wrong. Looks like you were doing testing and forgot to restore 



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -43,14 +47,26 @@ public abstract class FileSizeCountTaskHelper {
   // Static lock object for table truncation synchronization
   private static final Object TRUNCATE_LOCK = new Object();
 
+  /**
+   * GLOBAL lock for cross-task synchronization during DB writes.
+   * 
+   * Scope: Shared across ALL tasks (FSO, OBS, Legacy)
+   * Protects: RocksDB read-modify-write operations
+   * Purpose: Ensures atomic updates when multiple tasks flush concurrently to 
the same bins
+   * 
+   * IMPORTANT: Callers MUST acquire this lock before calling 
writeCountsToDB().
+   * This lock should NOT be acquired inside writeCountsToDB() to avoid nested 
locking.
+   */
+  public static final ReentrantReadWriteLock FILE_COUNT_WRITE_LOCK = 

Review Comment:
   Now we have so many locks local, global etc,  Document the lock hierarchy 
clearly in code comments. Also Add lock ordering diagram in PR description.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {
+  private final Table<K, V> table;
+  private final Codec<K> keyCodec;
+  private final ExecutorService iteratorExecutor;
+  private final ExecutorService valueExecutors;
+  private final int maxNumberOfVals;
+  private final OMMetadataManager metadataManager;
+  private final int maxIteratorTasks;
+  private final int maxWorkerTasks;
+  private final long logCountThreshold;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
+  public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
+                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
+                                        long logThreshold) {
+    this.table = table;
+    this.keyCodec = keyCodec;
+    this.metadataManager = metadataManager;
+    this.maxIteratorTasks = 2 * iteratorCount;
+    this.maxWorkerTasks = workerCount * 2;
+    this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<>(iteratorCount * 2),
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, 
TimeUnit.MINUTES,
+            new ArrayBlockingQueue<>(workerCount * 2),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / 
(workerCount));
+    this.logCountThreshold = logThreshold;
+  }
+
+
+  private List<K> getBounds(K startKey, K endKey) throws IOException {
+    Set<K> keys = new HashSet<>();
+
+    // Try to get SST file boundaries for optimal segmentation
+    // In test/mock environments, this may not be available
+    try {
+      RDBStore store = (RDBStore) this.metadataManager.getStore();
+      if (store != null && store.getDb() != null) {
+        List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList();
+        String tableName = table.getName();
+
+        // Only filter by column family if table name is available
+        if (tableName != null && !tableName.isEmpty()) {
+          byte[] tableNameBytes = tableName.getBytes(StandardCharsets.UTF_8);
+          for (LiveFileMetaData sstFile : sstFiles) {
+            // Filter SST files by column family to get bounds only for this 
specific table
+            if (Arrays.equals(sstFile.columnFamilyName(), tableNameBytes)) {
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey()));
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey()));
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      // If we can't get SST files (test environment, permissions, etc.),
+      // just use empty bounds and rely on fallback path
+      LOG.debug("Unable to retrieve SST file boundaries, will use fallback 
iteration: {}", e.getMessage());
+    }
+
+    if (startKey != null) {
+      keys.add(startKey);
+    }
+    if (endKey != null) {
+      keys.add(endKey);
+    }
+
+    return keys.stream().sorted().filter(Objects::nonNull)
+            .filter(key -> startKey == null || key.compareTo(startKey) >= 0)
+            .filter(key -> endKey == null || endKey.compareTo(key) >= 0)
+            .collect(Collectors.toList());
+  }
+
+  private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize)
+          throws ExecutionException, InterruptedException {
+    while (!futures.isEmpty() && futures.size() > expectedSize) {
+      Future<?> f = futures.poll();
+      f.get();
+    }
+  }
+
+  public void performTaskOnTableVals(String taskName, K startKey, K endKey,
+      Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, 
ExecutionException, InterruptedException {
+    List<K> bounds = getBounds(startKey, endKey);
+    
+    // Fallback for small tables (no SST files yet - data only in memtable)
+    if (bounds.size() < 2) {
+      try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = 
table.iterator()) {
+        if (startKey != null) {
+          iter.seek(startKey);
+        }
+        while (iter.hasNext()) {
+          Table.KeyValue<K, V> kv = iter.next();
+          if (endKey != null && kv.getKey().compareTo(endKey) > 0) {
+            break;
+          }
+          keyOperation.apply(kv);
+        }
+      }
+      return;
+    }
+    
+    Queue<Future<?>> iterFutures = new LinkedList<>();
+    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
+    AtomicLong keyCounter = new AtomicLong();
+    AtomicLong prevLogCounter = new AtomicLong();
+    for (int idx = 1; idx < bounds.size(); idx++) {
+      K beg = bounds.get(idx - 1);
+      K end = bounds.get(idx);
+      boolean inclusive = idx == bounds.size() - 1;
+      waitForQueueSize(iterFutures, maxIteratorTasks - 1);
+      iterFutures.add(iteratorExecutor.submit(() -> {
+        try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = 
table.iterator()) {
+          iter.seek(beg);
+          while (iter.hasNext()) {
+            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
+            boolean reachedEnd = false;
+            while (iter.hasNext()) {
+              Table.KeyValue<K, V> kv = iter.next();
+              K key = kv.getKey();
+              
+              // Check if key is within this segment's range
+              boolean withinBounds;
+              if (inclusive) {
+                // Last segment: include everything from beg onwards (or until 
endKey if specified)
+                withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
+              } else {
+                // Middle segment: include keys in range [beg, end)
+                withinBounds = key.compareTo(end) < 0;
+              }
+              
+              if (withinBounds) {
+                keyValues.add(kv);
+              } else {
+                reachedEnd = true;
+                break;
+              }
+              if (keyValues.size() >= maxNumberOfVals) {
+                break;
+              }
+            }
+            if (!keyValues.isEmpty()) {
+              waitForQueueSize(workerFutures, maxWorkerTasks - 10);
+              workerFutures.add(valueExecutors.submit(() -> {
+                for (Table.KeyValue<K, V> kv : keyValues) {
+                  keyOperation.apply(kv);
+                }
+                keyCounter.addAndGet(keyValues.size());
+                if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                  synchronized (keyCounter) {
+                    if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                      long cnt = keyCounter.get();
+                      LOG.info("Iterated through {} keys while performing 
task: {}", keyCounter.get(), taskName);
+                      prevLogCounter.set(cnt);
+                    }
+                  }
+                }
+              }));
+            }
+            if (reachedEnd) {
+              break;
+            }
+          }

Review Comment:
   ITERATOR CLOSES HERE (end of try-with-resources)
          // But workers submitted at line 196 may still be running!
   
     1. Line 165-215: Iterator is open, iterator thread collects KeyValue 
objects and submits them to workers
     2. Line 215: Iterator closes (try-with-resources auto-close)
     3. Line 196-210: Worker threads are STILL processing the KeyValue objects
     4. Line 230: Wait for iterators (already closed)
     5. Line 231: Wait for workers (but they're using data from closed 
iterators!)
   
     The Fix:
   
     Move line 231 INSIDE the iterator thread (before line 215), so workers 
finish BEFORE iterator closes



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -702,145 +702,153 @@ public boolean syncDataFromOM() {
       try {
         long currentSequenceNumber = getCurrentOMDBSequenceNumber();
         LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber);
-        boolean fullSnapshot = false;
+        boolean fullSnapshot = true;
+
 
-        if (currentSequenceNumber <= 0) {
-          fullSnapshot = true;
+        if (reInitializeTasksCalled.compareAndSet(false, true)) {
+          LOG.info("Calling reprocess on Recon tasks.");
+          reconTaskController.reInitializeTasks(omMetadataManager, null);
         } else {
-          // Get updates from OM and apply to local Recon OM DB and update 
task status in table
-          deltaReconTaskStatusUpdater.recordRunStart();
-          int loopCount = 0;
-          long fromSequenceNumber = currentSequenceNumber;
-          long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
-          /**
-           * This loop will continue to fetch and apply OM DB updates and with 
every
-           * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count 
of DB updates.
-           * It continues to fetch from OM till the lag, between OM DB WAL 
sequence number
-           * and Recon OM DB snapshot WAL sequence number, is less than this 
lag threshold value.
-           * In high OM write TPS cluster, this simulates continuous pull from 
OM without any delay.
-           */
-          while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
-            try (OMDBUpdatesHandler omdbUpdatesHandler =
-                     new OMDBUpdatesHandler(omMetadataManager)) {
-
-              // If interrupt was previously signalled,
-              // we should check for it before starting delta update sync.
-              if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException("Thread interrupted during 
delta update.");
-              }
-              diffBetweenOMDbAndReconDBSeqNumber =
-                  getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, 
omdbUpdatesHandler);
-              deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
-              // Keeping last updated sequence number for both full and delta 
tasks to be same
-              // because sequence number of DB denotes and points to same OM 
DB copy of Recon,
-              // even though two different tasks are updating the DB at 
different conditions, but
-              // it tells the sync state with actual OM DB for the same Recon 
OM DB copy.
-              
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              deltaReconTaskStatusUpdater.recordRunCompletion();
-              fullSnapshotReconTaskUpdater.updateDetails();
-              // Update the current OM metadata manager in task controller
-              reconTaskController.updateOMMetadataManager(omMetadataManager);
-              
-              // Pass on DB update events to tasks that are listening.
-              reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
-                  omdbUpdatesHandler.getEvents(), 
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
-              
-              // Check if task reinitialization is needed due to buffer 
overflow or task failures
-              boolean bufferOverflowed = 
reconTaskController.hasEventBufferOverflowed();
-              boolean tasksFailed = reconTaskController.hasTasksFailed();
-
-              if (bufferOverflowed || tasksFailed) {
-                ReconTaskReInitializationEvent.ReInitializationReason reason = 
bufferOverflowed ?
-                    
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW :
-                    
ReconTaskReInitializationEvent.ReInitializationReason.TASK_FAILURES;
-
-                LOG.warn("Detected condition for task reinitialization: {}, 
queueing async reinitialization event",
-                    reason);
-
-                markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-
-                // Queue async reinitialization event - checkpoint creation 
and retry logic is handled internally
-                ReconTaskController.ReInitializationResult result =
-                    reconTaskController.queueReInitializationEvent(reason);
-
-                //TODO: Create a metric to track this event buffer overflow or 
task failure event
-                boolean triggerFullSnapshot =
-                    Optional.ofNullable(result)
-                        .map(r -> {
-                          switch (r) {
-                          case MAX_RETRIES_EXCEEDED:
-                            LOG.warn(
-                                "Reinitialization queue failures exceeded 
maximum retries, triggering full snapshot " +
-                                    "fallback");
-                            return true;
-
-                          case RETRY_LATER:
-                            LOG.debug("Reinitialization event queueing will be 
retried in next iteration");
-                            return false;
-
-                          default:
-                            LOG.info("Reinitialization event successfully 
queued");
-                            return false;
-                          }
-                        })
-                        .orElseGet(() -> {
-                          LOG.error(
-                              "ReInitializationResult is null, something went 
wrong in queueing reinitialization " +
-                                  "event");
-                          return true;
-                        });
-
-                if (triggerFullSnapshot) {
-                  fullSnapshot = true;
-                }
-              }
-              currentSequenceNumber = getCurrentOMDBSequenceNumber();
-              LOG.debug("Updated current sequence number: {}", 
currentSequenceNumber);
-              loopCount++;
-            } catch (InterruptedException intEx) {
-              LOG.error("OM DB Delta update sync thread was interrupted and 
delta sync failed.");
-              // We are updating the table even if it didn't run i.e. got 
interrupted beforehand
-              // to indicate that a task was supposed to run, but it didn't.
-              markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-              Thread.currentThread().interrupt();
-              // Since thread is interrupted, we do not fall back to snapshot 
sync.
-              // Return with sync failed status.
-              return false;
-            } catch (Exception e) {
-              markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-              LOG.warn("Unable to get and apply delta updates from OM: {}, 
falling back to full snapshot",
-                  e.getMessage());
-              fullSnapshot = true;
-            }
-            if (fullSnapshot) {
-              break;
-            }
-          }
-          LOG.info("Delta updates received from OM : {} loops, {} records", 
loopCount,
-              getCurrentOMDBSequenceNumber() - fromSequenceNumber);
+          LOG.info("reInitializeTasks already called once; skipping.");
         }
 
-        if (fullSnapshot) {
-          try {
-            executeFullSnapshot(fullSnapshotReconTaskUpdater, 
deltaReconTaskStatusUpdater);
-          } catch (InterruptedException intEx) {
-            LOG.error("OM DB Snapshot update sync thread was interrupted.");
-            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
-            fullSnapshotReconTaskUpdater.recordRunCompletion();
-            Thread.currentThread().interrupt();
-            // Mark sync status as failed.
-            return false;
-          } catch (Exception e) {
-            metrics.incrNumSnapshotRequestsFailed();
-            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
-            fullSnapshotReconTaskUpdater.recordRunCompletion();
-            LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
-            // Update health status in ReconContext
-            reconContext.updateHealthStatus(new AtomicBoolean(false));
-            
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
-          }
-        }
+//        if (currentSequenceNumber <= 0) {

Review Comment:
   Code is commented again. Pls check.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {
+  private final Table<K, V> table;
+  private final Codec<K> keyCodec;
+  private final ExecutorService iteratorExecutor;
+  private final ExecutorService valueExecutors;
+  private final int maxNumberOfVals;
+  private final OMMetadataManager metadataManager;
+  private final int maxIteratorTasks;
+  private final int maxWorkerTasks;
+  private final long logCountThreshold;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
+  public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
+                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
+                                        long logThreshold) {
+    this.table = table;
+    this.keyCodec = keyCodec;
+    this.metadataManager = metadataManager;
+    this.maxIteratorTasks = 2 * iteratorCount;
+    this.maxWorkerTasks = workerCount * 2;
+    this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<>(iteratorCount * 2));
+    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, 
TimeUnit.MINUTES,
+            new ArrayBlockingQueue<>(workerCount * 2));
+    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / 
(workerCount));
+    this.logCountThreshold = logThreshold;
+  }
+
+
+  private List<K> getBounds(K startKey, K endKey) throws IOException {
+    RDBStore store = (RDBStore) this.metadataManager.getStore();
+    List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList();
+    Set<K> keys = new HashSet<>();
+    String tableName = table.getName();
+    byte[] tableNameBytes = tableName.getBytes(StandardCharsets.UTF_8);
+    for (LiveFileMetaData sstFile : sstFiles) {
+      // Filter SST files by column family to get bounds only for this 
specific table
+      if (Arrays.equals(sstFile.columnFamilyName(), tableNameBytes)) {
+        keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey()));
+        keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey()));
+      }
+    }
+    if (startKey != null) {
+      keys.add(startKey);
+    }
+    if (endKey != null) {
+      keys.add(endKey);
+    }
+
+    return keys.stream().sorted().filter(Objects::nonNull)
+            .filter(key -> startKey == null || key.compareTo(startKey) >= 0)
+            .filter(key -> endKey == null || endKey.compareTo(key) >= 0)
+            .collect(Collectors.toList());
+  }
+
+  private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize)
+          throws ExecutionException, InterruptedException {
+    while (!futures.isEmpty() && futures.size() > expectedSize) {
+      Future<?> f = futures.poll();
+      f.get();
+    }
+  }
+
+  public void performTaskOnTableVals(String taskName, K startKey, K endKey,
+      Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, 
ExecutionException, InterruptedException {
+    List<K> bounds = getBounds(startKey, endKey);
+    
+    // Fallback for small tables (no SST files yet - data only in memtable)
+    if (bounds.size() < 2) {
+      try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = 
table.iterator()) {
+        if (startKey != null) {
+          iter.seek(startKey);
+        }
+        while (iter.hasNext()) {
+          Table.KeyValue<K, V> kv = iter.next();
+          if (endKey != null && kv.getKey().compareTo(endKey) > 0) {
+            break;
+          }
+          keyOperation.apply(kv);
+        }
+      }
+      return;
+    }
+    
+    Queue<Future<?>> iterFutures = new LinkedList<>();
+    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
+    AtomicLong keyCounter = new AtomicLong();
+    AtomicLong prevLogCounter = new AtomicLong();
+    for (int idx = 1; idx < bounds.size(); idx++) {
+      K beg = bounds.get(idx - 1);
+      K end = bounds.get(idx);
+      boolean inclusive = idx == bounds.size() - 1;
+      waitForQueueSize(iterFutures, maxIteratorTasks - 1);
+      iterFutures.add(iteratorExecutor.submit(() -> {
+        try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = 
table.iterator()) {
+          iter.seek(beg);
+          while (iter.hasNext()) {
+            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
+            boolean reachedEnd = false;
+            while (iter.hasNext()) {
+              Table.KeyValue<K, V> kv = iter.next();
+              K key = kv.getKey();
+              
+              // Check if key is within this segment's range
+              boolean withinBounds;
+              if (inclusive) {
+                // Last segment: include everything from beg onwards (or until 
endKey if specified)
+                withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
+              } else {
+                // Middle segment: include keys in range [beg, end)
+                withinBounds = key.compareTo(end) < 0;
+              }
+              
+              if (withinBounds) {
+                keyValues.add(kv);
+              } else {
+                reachedEnd = true;
+                break;
+              }
+              if (keyValues.size() >= maxNumberOfVals) {
+                break;
+              }
+            }
+            if (!keyValues.isEmpty()) {
+              waitForQueueSize(workerFutures, maxWorkerTasks - 10);
+              workerFutures.add(valueExecutors.submit(() -> {
+                for (Table.KeyValue<K, V> kv : keyValues) {
+                  keyOperation.apply(kv);
+                }
+                keyCounter.addAndGet(keyValues.size());
+                if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                  synchronized (keyCounter) {
+                    if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                      long cnt = keyCounter.get();
+                      LOG.info("Iterated through {} keys while performing 
task: {}", keyCounter.get(), taskName);
+                      prevLogCounter.set(cnt);
+                    }
+                  }
+                }
+              }));
+            }
+            if (reachedEnd) {
+              break;
+            }
+          }
+        } catch (IOException | ExecutionException | InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }));
+    }
+    waitForQueueSize(iterFutures, 0);
+    waitForQueueSize(workerFutures, 0);
+    
+    // Log final stats
+    LOG.info("{}: Parallel iteration completed - Total keys processed: {}", 
taskName, keyCounter.get());
+  }
+
+  @Override
+  public void close() throws IOException {
+    iteratorExecutor.shutdown();

Review Comment:
   Thanks for including this suggestion, but I gave 60 seconds just as an 
example.
   
   **60 seconds may not be enough**
   
     For example: You're reprocessing 10 million keys on a large cluster:
     - At line 242: Iterator threads are finishing up (fast, usually < 10 
seconds)
     - At line 245: Worker threads are still writing to RocksDB
     - Writing to RocksDB can be slow if:
       - Disk is slow
       - RocksDB is compacting
       - Lock contention (waiting for FILE_COUNT_WRITE_LOCK)
   
     If workers take longer than 60 seconds:
     - Line 246: shutdownNow() forcefully interrupts threads
     - Result: Partial data written, corrupted state, lost progress
   
     Example Timeline:
     T=0s:   Start processing 10M keys
     T=30s:  Iterators finish reading all keys, submit to workers
     T=35s:  close() called either due to Recon shutdown manually or 
unintentionally, iterators shutdown cleanly
     T=40s:  Workers still processing, 2M keys left
     T=95s:  Workers finish 8M keys, still 2M left, flushing to DB
     T=100s:  TIMEOUT! shutdownNow() called
             2M keys lost!
             Partial flush may corrupt DB!
   
     The Fix:
   
     Make timeout configurable and increase default:
   
     ```
   // Line 67: Add configurable timeout (in constructor)
     private final long shutdownTimeoutSeconds;
   
     public ParallelTableIteratorOperation(..., long shutdownTimeoutSeconds) {
       // ... existing code ...
       this.shutdownTimeoutSeconds = shutdownTimeoutSeconds;
     }
   
     // Line 242-246: Use configurable timeout
     if (!iteratorExecutor.awaitTermination(shutdownTimeoutSeconds, 
TimeUnit.SECONDS)) {
       LOG.warn("Iterator executor did not terminate within {} seconds, forcing 
shutdown",
                shutdownTimeoutSeconds);
       iteratorExecutor.shutdownNow();
     }
   
     if (!valueExecutors.awaitTermination(shutdownTimeoutSeconds, 
TimeUnit.SECONDS)) {
       LOG.warn("Value executor did not terminate within {} seconds, forcing 
shutdown",
                shutdownTimeoutSeconds);
       valueExecutors.shutdownNow();
     }
   
     **And in the caller, pass a longer timeout:**
   
     // In ContainerKeyMapperHelper or FileSizeCountTaskHelper:
     try (ParallelTableIteratorOperation<String, OmKeyInfo> parallelIter =
            new ParallelTableIteratorOperation<>(
              omMetadataManager,
              table,
              new StringCodec(),
              maxIterators,
              maxWorkers,
              maxKeysInMemory,
              logThreshold,
              300  // 5 minutes instead of 60 seconds
            )) {
       // ... use it ...
     }  // close() will wait up to 5 minutes
   ```



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -162,6 +162,21 @@ public final class  ReconServerConfigKeys {
   public static final long
       OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 
1000L;
 
+  public static final String
+      OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS = 
"ozone.recon.task.reprocess.max.iterators";
+
+  public static final int OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT = 5;
+
+  public static final String
+      OZONE_RECON_TASK_REPROCESS_MAX_WORKERS = 
"ozone.recon.task.reprocess.max.workers";
+
+  public static final int OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT = 20;
+
+  public static final String
+      OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY = 
"ozone.recon.task.reprocess.max.keys.in.memory";
+
+  public static final int 
OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT = 2000;
+

Review Comment:
   Please add some comments and explain : when/how to tune these ?
   
   Also please provide some guidance in javadoc here for memory consumption 
with an example something like below:
   
    Resource calculation:
     - FSO + OBS tasks running concurrently = 10 iterators + 40 workers = 50 
threads
     - Plus caller thread pools = potentially 100+ threads during reprocess
     - Memory: 2000 keys × ~500 bytes/key × 2 tasks = ~2MB (reasonable)
   
   Can also be added in ozone-default.xml documentation with tuning guidelines.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -134,31 +170,80 @@ public static boolean reprocessBucketLayout(BucketLayout 
bucketLayout,
                                               OMMetadataManager 
omMetadataManager,
                                               Map<FileSizeCountKey, Long> 
fileSizeCountMap,
                                               ReconFileMetadataManager 
reconFileMetadataManager,
-                                              String taskName) {
+                                              String taskName,
+                                              int maxIterators,
+                                              int maxWorkers,
+                                              int maxKeysInMemory) {
+    LOG.info("{}: Starting parallel iteration with {} iterators, {} workers 
for bucket layout {}",
+        taskName, maxIterators, maxWorkers, bucketLayout);
     Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
-    int totalKeysProcessed = 0;
+    long startTime = Time.monotonicNow();
+    
+    // LOCAL lock (task-specific) - coordinates worker threads within this 
task only
+    // Protects: in-memory fileSizeCountMap from concurrent access by workers 
in THIS task
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    // Flag to coordinate flush attempts - prevents all threads from queuing 
for write lock
+    AtomicBoolean isFlushingInProgress = new AtomicBoolean(false);
+    final int FLUSH_THRESHOLD = 100000;
     
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter =
-             omKeyInfoTable.iterator()) {
-      while (keyIter.hasNext()) {
-        Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
+    // Use parallel table iteration
+    Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
+      try {
+        lock.readLock().lock();
         handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
-        totalKeysProcessed++;
-
-        // Flush to RocksDB periodically.
-        if (fileSizeCountMap.size() >= 100000) {
-          // For reprocess, we don't need to check existing values since table 
was truncated
-          LOG.debug("Flushing {} accumulated counts to RocksDB for {}", 
fileSizeCountMap.size(), taskName);
-          writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
-          fileSizeCountMap.clear();
+      } finally {
+        lock.readLock().unlock();
+      }
+      
+      // Only one thread should attempt flush to avoid blocking all workers
+      if (fileSizeCountMap.size() >= FLUSH_THRESHOLD &&
+          isFlushingInProgress.compareAndSet(false, true)) {
+        try {
+          // Step 1: Acquire LOCAL lock (task-specific) to stop worker threads 
in THIS task
+          lock.writeLock().lock();
+          try {
+            // Double-check after acquiring write lock
+            if (fileSizeCountMap.size() >= FLUSH_THRESHOLD) {
+              LOG.debug("Flushing {} accumulated counts to RocksDB for {}", 
fileSizeCountMap.size(), taskName);
+              // Step 2: Acquire GLOBAL lock (cross-task) to protect DB from 
concurrent FSO/OBS/Legacy writes
+              FILE_COUNT_WRITE_LOCK.writeLock().lock();

Review Comment:
   Better move this lock up, immediately after `lock.writeLock().lock();`



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -105,24 +120,45 @@ public static void 
truncateFileCountTableIfNeeded(ReconFileMetadataManager recon
   public static ReconOmTask.TaskResult reprocess(OMMetadataManager 
omMetadataManager,
                                                  ReconFileMetadataManager 
reconFileMetadataManager,
                                                  BucketLayout bucketLayout,
-                                                 String taskName) {
-    LOG.info("Starting RocksDB Reprocess for {}", taskName);
-    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
-    long startTime = Time.monotonicNow();
+                                                 String taskName,
+                                                 int maxIterators,
+                                                 int maxWorkers,
+                                                 int maxKeysInMemory) {
+    LOG.info("{}: Starting parallel RocksDB reprocess with {} iterators, {} 
workers for bucket layout {}",
+        taskName, maxIterators, maxWorkers, bucketLayout);
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new ConcurrentHashMap<>();
+    long overallStartTime = Time.monotonicNow();
     
     // Ensure the file count table is truncated only once during reprocess
     truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName);
     
+    long iterationStartTime = Time.monotonicNow();
     boolean status = reprocessBucketLayout(
-        bucketLayout, omMetadataManager, fileSizeCountMap, 
reconFileMetadataManager, taskName);
+        bucketLayout, omMetadataManager, fileSizeCountMap, 
reconFileMetadataManager, taskName,
+        maxIterators, maxWorkers, maxKeysInMemory);
     if (!status) {
       return buildTaskResult(taskName, false);
     }
+    long iterationEndTime = Time.monotonicNow();
     
-    writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
+    long writeStartTime = Time.monotonicNow();
+    // Acquire GLOBAL lock (cross-task) before writing to DB
+    FILE_COUNT_WRITE_LOCK.writeLock().lock();

Review Comment:
   Should move this lock inside try block.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {
+  private final Table<K, V> table;
+  private final Codec<K> keyCodec;
+  private final ExecutorService iteratorExecutor;
+  private final ExecutorService valueExecutors;
+  private final int maxNumberOfVals;
+  private final OMMetadataManager metadataManager;
+  private final int maxIteratorTasks;
+  private final int maxWorkerTasks;
+  private final long logCountThreshold;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
+  public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
+                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
+                                        long logThreshold) {
+    this.table = table;
+    this.keyCodec = keyCodec;
+    this.metadataManager = metadataManager;
+    this.maxIteratorTasks = 2 * iteratorCount;
+    this.maxWorkerTasks = workerCount * 2;
+    this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<>(iteratorCount * 2),
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, 
TimeUnit.MINUTES,
+            new ArrayBlockingQueue<>(workerCount * 2),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / 
(workerCount));
+    this.logCountThreshold = logThreshold;
+  }
+
+
+  private List<K> getBounds(K startKey, K endKey) throws IOException {
+    Set<K> keys = new HashSet<>();
+
+    // Try to get SST file boundaries for optimal segmentation
+    // In test/mock environments, this may not be available
+    try {
+      RDBStore store = (RDBStore) this.metadataManager.getStore();
+      if (store != null && store.getDb() != null) {
+        List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList();
+        String tableName = table.getName();
+
+        // Only filter by column family if table name is available
+        if (tableName != null && !tableName.isEmpty()) {
+          byte[] tableNameBytes = tableName.getBytes(StandardCharsets.UTF_8);
+          for (LiveFileMetaData sstFile : sstFiles) {
+            // Filter SST files by column family to get bounds only for this 
specific table
+            if (Arrays.equals(sstFile.columnFamilyName(), tableNameBytes)) {
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey()));
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey()));
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      // If we can't get SST files (test environment, permissions, etc.),
+      // just use empty bounds and rely on fallback path
+      LOG.debug("Unable to retrieve SST file boundaries, will use fallback 
iteration: {}", e.getMessage());
+    }
+
+    if (startKey != null) {
+      keys.add(startKey);
+    }
+    if (endKey != null) {
+      keys.add(endKey);
+    }
+
+    return keys.stream().sorted().filter(Objects::nonNull)
+            .filter(key -> startKey == null || key.compareTo(startKey) >= 0)
+            .filter(key -> endKey == null || endKey.compareTo(key) >= 0)
+            .collect(Collectors.toList());
+  }
+
+  private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize)
+          throws ExecutionException, InterruptedException {
+    while (!futures.isEmpty() && futures.size() > expectedSize) {
+      Future<?> f = futures.poll();
+      f.get();
+    }
+  }
+
+  public void performTaskOnTableVals(String taskName, K startKey, K endKey,
+      Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, 
ExecutionException, InterruptedException {
+    List<K> bounds = getBounds(startKey, endKey);
+    
+    // Fallback for small tables (no SST files yet - data only in memtable)
+    if (bounds.size() < 2) {
+      try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = 
table.iterator()) {
+        if (startKey != null) {
+          iter.seek(startKey);
+        }
+        while (iter.hasNext()) {
+          Table.KeyValue<K, V> kv = iter.next();
+          if (endKey != null && kv.getKey().compareTo(endKey) > 0) {
+            break;
+          }
+          keyOperation.apply(kv);
+        }
+      }
+      return;
+    }
+    
+    Queue<Future<?>> iterFutures = new LinkedList<>();
+    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
+    AtomicLong keyCounter = new AtomicLong();
+    AtomicLong prevLogCounter = new AtomicLong();
+    for (int idx = 1; idx < bounds.size(); idx++) {
+      K beg = bounds.get(idx - 1);
+      K end = bounds.get(idx);
+      boolean inclusive = idx == bounds.size() - 1;
+      waitForQueueSize(iterFutures, maxIteratorTasks - 1);
+      iterFutures.add(iteratorExecutor.submit(() -> {
+        try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = 
table.iterator()) {
+          iter.seek(beg);
+          while (iter.hasNext()) {
+            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
+            boolean reachedEnd = false;
+            while (iter.hasNext()) {
+              Table.KeyValue<K, V> kv = iter.next();
+              K key = kv.getKey();
+              
+              // Check if key is within this segment's range
+              boolean withinBounds;
+              if (inclusive) {
+                // Last segment: include everything from beg onwards (or until 
endKey if specified)
+                withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
+              } else {
+                // Middle segment: include keys in range [beg, end)
+                withinBounds = key.compareTo(end) < 0;
+              }
+              
+              if (withinBounds) {
+                keyValues.add(kv);
+              } else {
+                reachedEnd = true;
+                break;
+              }
+              if (keyValues.size() >= maxNumberOfVals) {
+                break;
+              }
+            }
+            if (!keyValues.isEmpty()) {
+              waitForQueueSize(workerFutures, maxWorkerTasks - 10);
+              workerFutures.add(valueExecutors.submit(() -> {
+                for (Table.KeyValue<K, V> kv : keyValues) {
+                  keyOperation.apply(kv);
+                }
+                keyCounter.addAndGet(keyValues.size());
+                if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                  synchronized (keyCounter) {
+                    if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
+                      long cnt = keyCounter.get();
+                      LOG.info("Iterated through {} keys while performing 
task: {}", keyCounter.get(), taskName);
+                      prevLogCounter.set(cnt);
+                    }
+                  }
+                }
+              }));
+            }
+            if (reachedEnd) {
+              break;
+            }
+          }
+        } catch (IOException e) {

Review Comment:
   FYI. Exception handling here is for iterator thread (not worker thread)



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {
+  private final Table<K, V> table;
+  private final Codec<K> keyCodec;
+  private final ExecutorService iteratorExecutor;
+  private final ExecutorService valueExecutors;
+  private final int maxNumberOfVals;
+  private final OMMetadataManager metadataManager;
+  private final int maxIteratorTasks;
+  private final int maxWorkerTasks;
+  private final long logCountThreshold;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
+  public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
+                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
+                                        long logThreshold) {
+    this.table = table;
+    this.keyCodec = keyCodec;
+    this.metadataManager = metadataManager;
+    this.maxIteratorTasks = 2 * iteratorCount;
+    this.maxWorkerTasks = workerCount * 2;
+    this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<>(iteratorCount * 2),
+                    new ThreadPoolExecutor.CallerRunsPolicy());

Review Comment:
   Now you have updated both iterator executor and value executors with 
`CallerRunsPolicy`  which runs work on caller thread when queue is full
   
   So If worker queue fills up, worker runs on iterator thread, which:
     - Slows down iteration (iterator thread blocks doing worker's job)
     - Hides the fact that system is overloaded
     - Makes performance unpredictable
   
     Better approach: Use AbortPolicy and handle RejectedExecutionException 
explicitly, or increase queue sizes.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {
+  private final Table<K, V> table;
+  private final Codec<K> keyCodec;
+  private final ExecutorService iteratorExecutor;
+  private final ExecutorService valueExecutors;
+  private final int maxNumberOfVals;
+  private final OMMetadataManager metadataManager;
+  private final int maxIteratorTasks;
+  private final int maxWorkerTasks;
+  private final long logCountThreshold;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
+  public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
+                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
+                                        long logThreshold) {
+    this.table = table;
+    this.keyCodec = keyCodec;
+    this.metadataManager = metadataManager;
+    this.maxIteratorTasks = 2 * iteratorCount;
+    this.maxWorkerTasks = workerCount * 2;
+    this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
+                    new ArrayBlockingQueue<>(iteratorCount * 2),
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, 
TimeUnit.MINUTES,
+            new ArrayBlockingQueue<>(workerCount * 2),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / 
(workerCount));
+    this.logCountThreshold = logThreshold;
+  }
+
+
+  private List<K> getBounds(K startKey, K endKey) throws IOException {
+    Set<K> keys = new HashSet<>();
+
+    // Try to get SST file boundaries for optimal segmentation
+    // In test/mock environments, this may not be available
+    try {
+      RDBStore store = (RDBStore) this.metadataManager.getStore();
+      if (store != null && store.getDb() != null) {
+        List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList();
+        String tableName = table.getName();
+
+        // Only filter by column family if table name is available
+        if (tableName != null && !tableName.isEmpty()) {
+          byte[] tableNameBytes = tableName.getBytes(StandardCharsets.UTF_8);
+          for (LiveFileMetaData sstFile : sstFiles) {
+            // Filter SST files by column family to get bounds only for this 
specific table
+            if (Arrays.equals(sstFile.columnFamilyName(), tableNameBytes)) {
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey()));
+              
keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey()));
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      // If we can't get SST files (test environment, permissions, etc.),
+      // just use empty bounds and rely on fallback path
+      LOG.debug("Unable to retrieve SST file boundaries, will use fallback 
iteration: {}", e.getMessage());
+    }
+
+    if (startKey != null) {
+      keys.add(startKey);
+    }
+    if (endKey != null) {
+      keys.add(endKey);
+    }
+
+    return keys.stream().sorted().filter(Objects::nonNull)
+            .filter(key -> startKey == null || key.compareTo(startKey) >= 0)
+            .filter(key -> endKey == null || endKey.compareTo(key) >= 0)
+            .collect(Collectors.toList());
+  }
+
+  private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize)
+          throws ExecutionException, InterruptedException {
+    while (!futures.isEmpty() && futures.size() > expectedSize) {
+      Future<?> f = futures.poll();
+      f.get();
+    }
+  }
+
+  public void performTaskOnTableVals(String taskName, K startKey, K endKey,
+      Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, 
ExecutionException, InterruptedException {
+    List<K> bounds = getBounds(startKey, endKey);
+    
+    // Fallback for small tables (no SST files yet - data only in memtable)
+    if (bounds.size() < 2) {
+      try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = 
table.iterator()) {
+        if (startKey != null) {
+          iter.seek(startKey);
+        }
+        while (iter.hasNext()) {
+          Table.KeyValue<K, V> kv = iter.next();
+          if (endKey != null && kv.getKey().compareTo(endKey) > 0) {
+            break;
+          }
+          keyOperation.apply(kv);
+        }
+      }
+      return;
+    }
+    
+    Queue<Future<?>> iterFutures = new LinkedList<>();
+    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
+    AtomicLong keyCounter = new AtomicLong();
+    AtomicLong prevLogCounter = new AtomicLong();
+    for (int idx = 1; idx < bounds.size(); idx++) {
+      K beg = bounds.get(idx - 1);
+      K end = bounds.get(idx);
+      boolean inclusive = idx == bounds.size() - 1;
+      waitForQueueSize(iterFutures, maxIteratorTasks - 1);
+      iterFutures.add(iteratorExecutor.submit(() -> {
+        try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = 
table.iterator()) {
+          iter.seek(beg);
+          while (iter.hasNext()) {
+            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
+            boolean reachedEnd = false;
+            while (iter.hasNext()) {
+              Table.KeyValue<K, V> kv = iter.next();
+              K key = kv.getKey();
+              
+              // Check if key is within this segment's range
+              boolean withinBounds;
+              if (inclusive) {
+                // Last segment: include everything from beg onwards (or until 
endKey if specified)
+                withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
+              } else {
+                // Middle segment: include keys in range [beg, end)
+                withinBounds = key.compareTo(end) < 0;
+              }
+              
+              if (withinBounds) {
+                keyValues.add(kv);
+              } else {
+                reachedEnd = true;
+                break;
+              }
+              if (keyValues.size() >= maxNumberOfVals) {
+                break;
+              }
+            }
+            if (!keyValues.isEmpty()) {
+              waitForQueueSize(workerFutures, maxWorkerTasks - 10);
+              workerFutures.add(valueExecutors.submit(() -> {
+                for (Table.KeyValue<K, V> kv : keyValues) {
+                  keyOperation.apply(kv);

Review Comment:
   What if this throws an exception?
   NO try-catch here! If keyOperation.apply() throws, worker fails silently
   
    If keyOperation.apply(kv) at line 198 throws an exception:
     - The worker thread dies silently
     - The exception is stored in the Future object
     - But you only get it if you call future.get()
     - Your current code at line 231 does call waitForQueueSize(workerFutures, 
0), which calls get(), so the exception WILL be caught
     - However, if 10 workers fail, you only see the FIRST exception (the other 
9 are lost)
   
   Can you verify this behavior , if whole reprocess for that task will fail 
and propogate ?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -43,14 +47,26 @@ public abstract class FileSizeCountTaskHelper {
   // Static lock object for table truncation synchronization
   private static final Object TRUNCATE_LOCK = new Object();
 
+  /**
+   * GLOBAL lock for cross-task synchronization during DB writes.
+   * 
+   * Scope: Shared across ALL tasks (FSO, OBS, Legacy)
+   * Protects: RocksDB read-modify-write operations
+   * Purpose: Ensures atomic updates when multiple tasks flush concurrently to 
the same bins
+   * 
+   * IMPORTANT: Callers MUST acquire this lock before calling 
writeCountsToDB().
+   * This lock should NOT be acquired inside writeCountsToDB() to avoid nested 
locking.
+   */
+  public static final ReentrantReadWriteLock FILE_COUNT_WRITE_LOCK = 
+      new ReentrantReadWriteLock();

Review Comment:
   Better to use fair locking, because one any task either OBS or FSO starts 
taking lock and then so many worker threads, thread starvation can happen 
because other task will not get `FILE_COUNT_WRITE_LOCK` so easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to