swamirishi commented on PR #9529:
URL: https://github.com/apache/ozone/pull/9529#issuecomment-3679663832

   This should ideally work
   ```
   Index: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
   --- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
  (revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
   +++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
  (date 1766358462652)
   @@ -56,37 +56,26 @@
    
      // Thread Pools
      private final ExecutorService iteratorExecutor; // 5
   -  private final ExecutorService valueExecutors; // 20
   -
   -  private final int maxNumberOfVals;
   +  private final long maxNumberOfVals;
      private final OMMetadataManager metadataManager;
      private final int maxIteratorTasks;
   -  private final int maxWorkerTasks;
      private final long logCountThreshold;
    
      private static final Logger LOG = 
LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
    
      public ParallelTableIteratorOperation(OMMetadataManager metadataManager, 
Table<K, V> table, Codec<K> keyCodec,
   -                                        int iteratorCount, int workerCount, 
int maxNumberOfValsInMemory,
   -                                        long logThreshold) {
   +                                        int iteratorCount, long 
logThreshold) {
        this.table = table;
        this.keyCodec = keyCodec;
        this.metadataManager = metadataManager;
        this.maxIteratorTasks = 2 * iteratorCount;  // Allow up to 10 pending 
iterator tasks
   -    this.maxWorkerTasks = workerCount * 2;      // Allow up to 40 pending 
worker tasks
    
        // Create team of 5 iterator threads with UNLIMITED queue
        // LinkedBlockingQueue() with no size = can hold infinite pending tasks
        this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, 
iteratorCount, 1, TimeUnit.MINUTES,
                        new LinkedBlockingQueue<>());
   -
   -    // Create team of 20 worker threads with UNLIMITED queue
   -    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 
1, TimeUnit.MINUTES,
   -            new LinkedBlockingQueue<>());
   -
   -    // Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per 
batch per worker)
   -    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / 
(workerCount));
        this.logCountThreshold = logThreshold;
   +    this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / 
(iteratorCount));
      }
    
      private List<K> getBounds(K startKey, K endKey) throws IOException {
   @@ -166,9 +155,6 @@
        // Queue to track iterator threads (5 threads creating work)
        Queue<Future<?>> iterFutures = new LinkedList<>();
    
   -    // Queue to track worker threads (20 threads doing work)
   -    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
   -
        AtomicLong keyCounter = new AtomicLong();
        AtomicLong prevLogCounter = new AtomicLong();
        Object logLock = new Object();
   @@ -190,75 +176,42 @@
          iterFutures.add(iteratorExecutor.submit(() -> {
            try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = 
table.iterator()) {
              iter.seek(beg);
   +          int count = 0;
              while (iter.hasNext()) {
   -            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
   -            boolean reachedEnd = false;
   -            while (iter.hasNext()) {
   -              Table.KeyValue<K, V> kv = iter.next();
   -              K key = kv.getKey();
   -              
   -              // Check if key is within this segment's range
   -              boolean withinBounds;
   -              if (inclusive) {
   -                // Last segment: include everything from beg onwards (or 
until endKey if specified)
   -                withinBounds = (endKey == null || key.compareTo(endKey) <= 
0);
   -              } else {
   -                // Middle segment: include keys in range [beg, end)
   -                withinBounds = key.compareTo(end) < 0;
   -              }
   -              
   -              if (withinBounds) {
   -                keyValues.add(kv);
   -              } else {
   -                reachedEnd = true;
   -                break;
   -              }
   -
   -              // If batch is full (2000 keys), stop collecting
   -              if (keyValues.size() >= maxNumberOfVals) {
   -                break;
   -              }
   +            Table.KeyValue<K, V> kv = iter.next();
   +            K key = kv.getKey();
   +            // Check if key is within this segment's range
   +            boolean withinBounds;
   +            if (inclusive) {
   +              // Last segment: include everything from beg onwards (or 
until endKey if specified)
   +              withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
   +            } else {
   +              // Middle segment: include keys in range [beg, end)
   +              withinBounds = key.compareTo(end) < 0;
   +            }
   +            if (!withinBounds) {
   +              break;
                }
   -
   -            // ===== STEP 5: HAND BATCH TO WORKER THREAD =====
   -            if (!keyValues.isEmpty()) {
   -              // WAIT if worker queue is too full (max 39 pending tasks)
   -              waitForQueueSize(workerFutures, maxWorkerTasks - 1);
   -
   -              // Submit batch to worker thread pool
   -              workerFutures.add(valueExecutors.submit(() -> {
   -                for (Table.KeyValue<K, V> kv : keyValues) {
   -                  keyOperation.apply(kv);
   -                }
   -                keyCounter.addAndGet(keyValues.size());
   -                if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
   -                  synchronized (logLock) {
   -                    if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
   -                      long cnt = keyCounter.get();
   -                      LOG.debug("Iterated through {} keys while performing 
task: {}", keyCounter.get(), taskName);
   -                      prevLogCounter.set(cnt);
   -                    }
   +            keyOperation.apply(kv);
   +            count++;
   +            if (count % maxNumberOfVals == 0) {
   +              keyCounter.addAndGet(count);
   +              count = 0;
   +              if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
   +                synchronized (logLock) {
   +                  if (keyCounter.get() - prevLogCounter.get() > 
logCountThreshold) {
   +                    long cnt = keyCounter.get();
   +                    LOG.debug("Iterated through {} keys while performing 
task: {}", keyCounter.get(), taskName);
   +                    prevLogCounter.set(cnt);
                      }
                    }
   -                // Worker task done! Future is now complete.
   -              }));
   -            }
   -            // If we reached the end of our segment, stop reading
   -            if (reachedEnd) {
   -              break;
   +              }
                }
              }
   +          keyCounter.addAndGet(count);
            } catch (IOException e) {
              LOG.error("IO error during parallel iteration on table {}", 
taskName, e);
              throw new RuntimeException("IO error during iteration", e);
   -        } catch (InterruptedException e) {
   -          LOG.warn("Parallel iteration interrupted for task {}", taskName, 
e);
   -          Thread.currentThread().interrupt();
   -          throw new RuntimeException("Iteration interrupted", e);
   -        } catch (ExecutionException e) {
   -          Throwable cause = e.getCause();
   -          LOG.error("Task execution failed for {}: {}", taskName, 
cause.getMessage(), cause);
   -          throw new RuntimeException("Task execution failed", cause);
            }
          }));
        }
   @@ -266,8 +219,6 @@
        // ===== STEP 7: WAIT FOR EVERYONE TO FINISH =====
        // Wait for all 5 iterator threads to finish reading
        waitForQueueSize(iterFutures, 0);
   -    // Wait for all 20 worker threads to finish processing
   -    waitForQueueSize(workerFutures, 0);
        
        // Log final stats
        LOG.info("{}: Parallel iteration completed - Total keys processed: {}", 
taskName, keyCounter.get());
   @@ -276,17 +227,12 @@
      @Override
      public void close() throws IOException {
        iteratorExecutor.shutdown();
   -    valueExecutors.shutdown();
        try {
          if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
            iteratorExecutor.shutdownNow();
          }
   -      if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) {
   -        valueExecutors.shutdownNow();
   -      }
        } catch (InterruptedException e) {
          iteratorExecutor.shutdownNow();
   -      valueExecutors.shutdownNow();
          Thread.currentThread().interrupt();
        }
      }
   Index: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
   --- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
     (revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
   +++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
     (date 1766358462633)
   @@ -167,7 +167,7 @@
          
          try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
                   new ParallelTableIteratorOperation<>(omMetadataManager, 
omKeyInfoTable,
   -                   StringCodec.get(), maxIterators, maxWorkers, 
maxKeysInMemory, perWorkerThreshold)) {
   +                   StringCodec.get(), maxIterators, perWorkerThreshold)) {
            keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
          }
    
   Index: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
   --- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
      (revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
   +++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
      (date 1766358462659)
   @@ -182,7 +182,7 @@
    
        try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
                 new ParallelTableIteratorOperation<>(omMetadataManager, 
omKeyInfoTable,
   -                 StringCodec.get(), maxIterators, maxWorkers, 
maxKeysInMemory, perWorkerThreshold)) {
   +                 StringCodec.get(), maxIterators, perWorkerThreshold)) {
          keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
        } catch (Exception ex) {
          LOG.error("Unable to populate File Size Count for {} in RocksDB.", 
taskName, ex);
   Index: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
   --- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
   (revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
   +++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
   (date 1766358462664)
   @@ -35,6 +35,8 @@
    import java.util.concurrent.atomic.AtomicLong;
    import org.apache.commons.lang3.tuple.Triple;
    import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
   +import org.apache.hadoop.hdds.utils.db.CodecBuffer;
   +import org.apache.hadoop.hdds.utils.db.CodecBufferCodec;
    import org.apache.hadoop.hdds.utils.db.DBStore;
    import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
    import org.apache.hadoop.hdds.utils.db.StringCodec;
   @@ -192,8 +194,8 @@
      private void processTableInParallel(String tableName, OMMetadataManager 
omMetadataManager) throws Exception {
        int workerCount = 2;  // Only 2 workers needed for simple counting
        
   -    Table<String, byte[]> table = omMetadataManager.getStore()
   -        .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), 
TableCache.CacheType.NO_CACHE);
   +    Table<String, CodecBuffer> table = omMetadataManager.getStore()
   +        .getTable(tableName, StringCodec.get(), CodecBufferCodec.get(true), 
TableCache.CacheType.NO_CACHE);
        
        long estimatedCount = 100000;  // Default
        try {
   @@ -205,9 +207,8 @@
        
        AtomicLong count = new AtomicLong(0);
    
   -    try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new 
ParallelTableIteratorOperation<>(
   -        omMetadataManager, table, StringCodec.get(),
   -        maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {
   +    try (ParallelTableIteratorOperation<String, CodecBuffer> parallelIter = 
new ParallelTableIteratorOperation<>(
   +        omMetadataManager, table, StringCodec.get(), maxIterators, 
loggingThreshold)) {
          
          parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
            if (kv != null) {
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to