This is an automated email from the ASF dual-hosted git repository.

devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e0bd2cc716 HDDS-12185. Enhance FileSizeCountTask for Faster 
Processing. (#7796)
e0bd2cc716 is described below

commit e0bd2cc71621087e41cbc0b79f72b6487d271264
Author: Arafat2198 <[email protected]>
AuthorDate: Fri Feb 21 11:17:29 2025 +0530

    HDDS-12185. Enhance FileSizeCountTask for Faster Processing. (#7796)
---
 .../apache/hadoop/ozone/recon/ReconConstants.java  |  13 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |  14 +-
 .../ozone/recon/tasks/FileSizeCountTaskFSO.java    |  70 +++++
 .../ozone/recon/tasks/FileSizeCountTaskHelper.java | 331 +++++++++++++++++++++
 .../ozone/recon/tasks/FileSizeCountTaskOBS.java    |  70 +++++
 .../ozone/recon/tasks/ReconTaskControllerImpl.java |   3 +
 .../hadoop/ozone/recon/api/TestEndpoints.java      |  64 +++-
 .../ozone/recon/tasks/TestFileSizeCountTask.java   | 258 +++++++++-------
 8 files changed, 695 insertions(+), 128 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 22b9d969bc..ecd88f8099 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.recon;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Recon Server constants file.
  */
@@ -89,4 +91,15 @@ private ReconConstants() {
       (double) MAX_CONTAINER_SIZE_UPPER_BOUND /
           MIN_CONTAINER_SIZE_UPPER_BOUND) /
       Math.log(2)) + 1;
+
+  // For file-size count reprocessing: ensure only one task truncates the 
table.
+  public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new 
AtomicBoolean(false);
+
+  /**
+   * Resets the table-truncated flag for the given tables. This should be 
called once per reprocess cycle,
+   * for example by the OM task controller, before the tasks run.
+   */
+  public static void resetTableTruncatedFlags() {
+    FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false);
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 42ab9a4716..127cbfcf9e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -30,6 +30,8 @@
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -57,7 +59,8 @@
 import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
 import 
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
 import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
 import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
 import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
@@ -129,12 +132,19 @@ protected void configure() {
       Multibinder<ReconOmTask> taskBinder =
           Multibinder.newSetBinder(binder(), ReconOmTask.class);
       taskBinder.addBinding().to(ContainerKeyMapperTask.class);
-      taskBinder.addBinding().to(FileSizeCountTask.class);
+      taskBinder.addBinding().to(FileSizeCountTaskFSO.class);
+      taskBinder.addBinding().to(FileSizeCountTaskOBS.class);
       taskBinder.addBinding().to(OmTableInsightTask.class);
       taskBinder.addBinding().to(NSSummaryTask.class);
     }
   }
 
+  @Provides
+  @Singleton
+  public ExecutorService provideReconExecutorService() {
+    return Executors.newFixedThreadPool(5);
+  }
+
   /**
    * Class that has all the DAO bindings in Recon.
    */
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
new file mode 100644
index 0000000000..f40a859590
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+
+/**
+ * Task for FileSystemOptimized (FSO) which processes the FILE_TABLE.
+ */
+public class FileSizeCountTaskFSO implements ReconOmTask {
+
+  private final FileCountBySizeDao fileCountBySizeDao;
+  private final DSLContext dslContext;
+
+  @Inject
+  public FileSizeCountTaskFSO(FileCountBySizeDao fileCountBySizeDao,
+                              UtilizationSchemaDefinition 
utilizationSchemaDefinition) {
+    this.fileCountBySizeDao = fileCountBySizeDao;
+    this.dslContext = utilizationSchemaDefinition.getDSLContext();
+  }
+
+  @Override
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    return FileSizeCountTaskHelper.reprocess(
+        omMetadataManager,
+        dslContext,
+        fileCountBySizeDao,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        getTaskName()
+    );
+  }
+
+  @Override
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    // This task listens only on the FILE_TABLE.
+    return FileSizeCountTaskHelper.processEvents(
+        events,
+        OmMetadataManagerImpl.FILE_TABLE,
+        dslContext,
+        fileCountBySizeDao,
+        getTaskName());
+  }
+
+  @Override
+  public String getTaskName() {
+    return "FileSizeCountTaskFSO";
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
new file mode 100644
index 0000000000..489449d6a9
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static 
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.ReconConstants;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class that encapsulates the common code for file size count tasks.
+ */
+public abstract class FileSizeCountTaskHelper {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(FileSizeCountTaskHelper.class);
+
+  // Static lock to guard table truncation.
+  private static final Object TRUNCATE_LOCK = new Object();
+
+  /**
+   * Truncates the FILE_COUNT_BY_SIZE table if it has not been truncated yet.
+   * This method synchronizes on a static lock to ensure only one task 
truncates at a time.
+   * If an error occurs, the flag is reset to allow retrying the truncation.
+   *
+   * @param dslContext DSLContext for executing DB commands.
+   */
+  public static void truncateTableIfNeeded(DSLContext dslContext) {
+    synchronized (TRUNCATE_LOCK) {
+      if (ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.compareAndSet(false, 
true)) {
+        try {
+          int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
+          LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
+        } catch (Exception e) {
+          // Reset the flag so that truncation can be retried
+          ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false);
+          LOG.error("Error while truncating FILE_COUNT_BY_SIZE table, 
resetting flag.", e);
+          throw new RuntimeException("Table truncation failed", e);  // 
Propagate upwards
+        }
+      } else {
+        LOG.info("Table already truncated by another task; waiting for 
truncation to complete.");
+      }
+    }
+  }
+
+  /**
+   * Executes the reprocess method for the given task.
+   *
+   * @param omMetadataManager  OM metadata manager.
+   * @param dslContext         DSLContext for DB operations.
+   * @param fileCountBySizeDao DAO for file count table.
+   * @param bucketLayout       The bucket layout to process.
+   * @param taskName           The name of the task for logging.
+   * @return A Pair of task name and boolean indicating success.
+   */
+  public static Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager,
+                                                DSLContext dslContext,
+                                                FileCountBySizeDao 
fileCountBySizeDao,
+                                                BucketLayout bucketLayout,
+                                                String taskName) {
+    LOG.info("Starting Reprocess for {}", taskName);
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
+    long startTime = System.currentTimeMillis();
+    truncateTableIfNeeded(dslContext);
+    boolean status = reprocessBucketLayout(
+        bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, 
fileCountBySizeDao, taskName);
+    if (!status) {
+      return new ImmutablePair<>(taskName, false);
+    }
+    writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+    long endTime = System.currentTimeMillis();
+    LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - 
startTime));
+    return new ImmutablePair<>(taskName, true);
+  }
+
+  /**
+   * Iterates over the OM DB keys for the given bucket layout and updates the 
fileSizeCountMap.
+   *
+   * @param bucketLayout       The bucket layout to use.
+   * @param omMetadataManager  OM metadata manager.
+   * @param fileSizeCountMap   Map accumulating file size counts.
+   * @param dslContext         DSLContext for DB operations.
+   * @param fileCountBySizeDao DAO for file count table.
+   * @param taskName           The name of the task for logging.
+   * @return true if processing succeeds, false otherwise.
+   */
+  public static boolean reprocessBucketLayout(BucketLayout bucketLayout,
+                                              OMMetadataManager 
omMetadataManager,
+                                              Map<FileSizeCountKey, Long> 
fileSizeCountMap,
+                                              DSLContext dslContext,
+                                              FileCountBySizeDao 
fileCountBySizeDao,
+                                              String taskName) {
+    Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
+    int totalKeysProcessed = 0;
+    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter =
+             omKeyInfoTable.iterator()) {
+      while (keyIter.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
+        handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
+        totalKeysProcessed++;
+
+        // Flush to DB periodically.
+        if (fileSizeCountMap.size() >= 100000) {
+          writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+          fileSizeCountMap.clear();
+        }
+      }
+    } catch (IOException ioEx) {
+      LOG.error("Unable to populate File Size Count for {} in Recon DB.", 
taskName, ioEx);
+      return false;
+    }
+    LOG.info("Reprocessed {} keys for bucket layout {}.", totalKeysProcessed, 
bucketLayout);
+    return true;
+  }
+
+  /**
+   * Processes a batch of OM update events.
+   *
+   * @param events             OM update event batch.
+   * @param tableName       The bucket layout for which either keyTable or 
fileTable is fetched
+   * @param dslContext         DSLContext for DB operations.
+   * @param fileCountBySizeDao DAO for file count table.
+   * @param taskName           The name of the task for logging.
+   * @return A Pair of task name and boolean indicating success.
+   */
+  public static Pair<String, Boolean> processEvents(OMUpdateEventBatch events,
+                                                    String tableName,
+                                                    DSLContext dslContext,
+                                                    FileCountBySizeDao 
fileCountBySizeDao,
+                                                    String taskName) {
+    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
+    long startTime = System.currentTimeMillis();
+    while (eventIterator.hasNext()) {
+      OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+      if (!tableName.equals(omdbUpdateEvent.getTable())) {
+        continue;
+      }
+      String updatedKey = omdbUpdateEvent.getKey();
+      Object value = omdbUpdateEvent.getValue();
+      Object oldValue = omdbUpdateEvent.getOldValue();
+      if (value instanceof OmKeyInfo) {
+        OmKeyInfo omKeyInfo = (OmKeyInfo) value;
+        OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue;
+        try {
+          switch (omdbUpdateEvent.getAction()) {
+          case PUT:
+            handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
+            break;
+          case DELETE:
+            handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
+            break;
+          case UPDATE:
+            if (omKeyInfoOld != null) {
+              handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap);
+              handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
+            } else {
+              LOG.warn("Update event does not have the old keyInfo for {}.", 
updatedKey);
+            }
+            break;
+          default:
+            LOG.trace("Skipping DB update event: {}", 
omdbUpdateEvent.getAction());
+          }
+        } catch (Exception e) {
+          LOG.error("Unexpected exception while processing key {}.", 
updatedKey, e);
+          return new ImmutablePair<>(taskName, false);
+        }
+      } else {
+        LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
+            value.getClass().getName(), updatedKey);
+      }
+    }
+    writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+    LOG.debug("{} successfully processed in {} milliseconds", taskName,
+        (System.currentTimeMillis() - startTime));
+    return new ImmutablePair<>(taskName, true);
+  }
+
+  /**
+   * Writes the accumulated file size counts to the DB.
+   *
+   * @param fileSizeCountMap   Map of file size counts.
+   * @param dslContext         DSLContext for DB operations.
+   * @param fileCountBySizeDao DAO for file count table.
+   */
+  public static void writeCountsToDB(Map<FileSizeCountKey, Long> 
fileSizeCountMap,
+                                     DSLContext dslContext,
+                                     FileCountBySizeDao fileCountBySizeDao) {
+
+    List<FileCountBySize> insertToDb = new ArrayList<>();
+    List<FileCountBySize> updateInDb = new ArrayList<>();
+    boolean isDbTruncated = isFileCountBySizeTableEmpty(dslContext); // Check 
if table is empty
+
+    fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
+      FileCountBySize newRecord = new FileCountBySize();
+      newRecord.setVolume(key.volume);
+      newRecord.setBucket(key.bucket);
+      newRecord.setFileSize(key.fileSizeUpperBound);
+      newRecord.setCount(fileSizeCountMap.get(key));
+      if (!isDbTruncated) {
+        // Get the current count from database and update
+        Record3<String, String, Long> recordToFind =
+            dslContext.newRecord(
+                    FILE_COUNT_BY_SIZE.VOLUME,
+                    FILE_COUNT_BY_SIZE.BUCKET,
+                    FILE_COUNT_BY_SIZE.FILE_SIZE)
+                .value1(key.volume)
+                .value2(key.bucket)
+                .value3(key.fileSizeUpperBound);
+        FileCountBySize fileCountRecord =
+            fileCountBySizeDao.findById(recordToFind);
+        if (fileCountRecord == null && newRecord.getCount() > 0L) {
+          // insert new row only for non-zero counts.
+          insertToDb.add(newRecord);
+        } else if (fileCountRecord != null) {
+          newRecord.setCount(fileCountRecord.getCount() +
+              fileSizeCountMap.get(key));
+          updateInDb.add(newRecord);
+        }
+      } else if (newRecord.getCount() > 0) {
+        // insert new row only for non-zero counts.
+        insertToDb.add(newRecord);
+      }
+    });
+    fileCountBySizeDao.insert(insertToDb);
+    fileCountBySizeDao.update(updateInDb);
+  }
+
+  /**
+   * Increments the count for a given key on a PUT event.
+   */
+  public static void handlePutKeyEvent(OmKeyInfo omKeyInfo,
+                                       Map<FileSizeCountKey, Long> 
fileSizeCountMap) {
+    FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
+    Long count = fileSizeCountMap.containsKey(key) ? fileSizeCountMap.get(key) 
+ 1L : 1L;
+    fileSizeCountMap.put(key, count);
+  }
+
+  /**
+   * Decrements the count for a given key on a DELETE event.
+   */
+  public static void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
+                                          Map<FileSizeCountKey, Long> 
fileSizeCountMap) {
+    if (omKeyInfo == null) {
+      LOG.warn("Deleting a key not found while handling DELETE key event. Key 
not found in Recon OM DB: {}", key);
+    } else {
+      FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
+      Long count = fileSizeCountMap.containsKey(countKey) ? 
fileSizeCountMap.get(countKey) - 1L : -1L;
+      fileSizeCountMap.put(countKey, count);
+    }
+  }
+
+  /**
+   * Returns a FileSizeCountKey for the given OmKeyInfo.
+   */
+  public static FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
+    return new FileSizeCountKey(omKeyInfo.getVolumeName(),
+        omKeyInfo.getBucketName(),
+        ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize()));
+  }
+
+  /**
+   * Checks if the FILE_COUNT_BY_SIZE table is empty.
+   */
+  public static boolean isFileCountBySizeTableEmpty(DSLContext dslContext) {
+    return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
+  }
+
+  /**
+   * Helper key class used for grouping file size counts.
+   */
+  public static class FileSizeCountKey {
+    private final String volume;
+    private final String bucket;
+    private final Long fileSizeUpperBound;
+
+    public FileSizeCountKey(String volume, String bucket, Long 
fileSizeUpperBound) {
+      this.volume = volume;
+      this.bucket = bucket;
+      this.fileSizeUpperBound = fileSizeUpperBound;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof FileSizeCountKey) {
+        FileSizeCountKey other = (FileSizeCountKey) obj;
+        return volume.equals(other.volume) &&
+            bucket.equals(other.bucket) &&
+            fileSizeUpperBound.equals(other.fileSizeUpperBound);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return (volume + bucket + fileSizeUpperBound).hashCode();
+    }
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
new file mode 100644
index 0000000000..acaab763ac
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+
+/**
+ * Task for ObjectStore (OBS) which processes the KEY_TABLE.
+ */
+public class FileSizeCountTaskOBS implements ReconOmTask {
+
+  private final FileCountBySizeDao fileCountBySizeDao;
+  private final DSLContext dslContext;
+
+  @Inject
+  public FileSizeCountTaskOBS(FileCountBySizeDao fileCountBySizeDao,
+                              UtilizationSchemaDefinition 
utilizationSchemaDefinition) {
+    this.fileCountBySizeDao = fileCountBySizeDao;
+    this.dslContext = utilizationSchemaDefinition.getDSLContext();
+  }
+
+  @Override
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    return FileSizeCountTaskHelper.reprocess(
+        omMetadataManager,
+        dslContext,
+        fileCountBySizeDao,
+        BucketLayout.OBJECT_STORE,
+        getTaskName()
+    );
+  }
+
+  @Override
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    // This task listens only on the KEY_TABLE.
+    return FileSizeCountTaskHelper.processEvents(
+        events,
+        OmMetadataManagerImpl.KEY_TABLE,
+        dslContext,
+        fileCountBySizeDao,
+        getTaskName());
+  }
+
+  @Override
+  public String getTaskName() {
+    return "FileSizeCountTaskOBS";
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index c2a967415c..c1d786db1a 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -39,6 +39,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconConstants;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask;
 import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException;
@@ -122,6 +123,7 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch 
events, OMMetadataMa
       }
 
       // Reprocess the failed tasks.
+      ReconConstants.resetTableTruncatedFlags();
       if (!retryFailedTasks.isEmpty()) {
         tasks.clear();
         for (String taskName : failedTasks) {
@@ -154,6 +156,7 @@ private void ignoreFailedTasks(List<String> failedTasks) {
   @Override
   public synchronized void reInitializeTasks(ReconOMMetadataManager 
omMetadataManager) {
     Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new 
ArrayList<>();
+    ReconConstants.resetTableTruncatedFlags();
     for (Map.Entry<String, ReconOmTask> taskEntry :
         reconOmTasks.entrySet()) {
       ReconOmTask task = taskEntry.getValue();
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 5a667bd7c7..52ac1d64f4 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -27,6 +27,7 @@
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDeletedKeysToOm;
 import static 
org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl.PROMETHEUS_INSTANT_QUERY_API;
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
 import static 
org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,7 +132,8 @@
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import 
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
 import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.ozone.test.LambdaTestUtils;
@@ -143,6 +145,7 @@
 import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
 import org.jooq.Configuration;
 import org.jooq.DSLContext;
+import org.jooq.Record3;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -162,7 +165,8 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
   private VolumeEndpoint volumeEndpoint;
   private BucketEndpoint bucketEndpoint;
   private ReconOMMetadataManager reconOMMetadataManager;
-  private FileSizeCountTask fileSizeCountTask;
+  private FileSizeCountTaskFSO fileSizeCountTaskFSO;
+  private FileSizeCountTaskOBS fileSizeCountTaskOBS;
   private ContainerSizeCountTask containerSizeCountTask;
   private OmTableInsightTask omTableInsightTask;
   private ReconStorageContainerManagerFacade reconScm;
@@ -305,8 +309,10 @@ private void initializeInjector() throws Exception {
         fileCountBySizeDao,
         containerCountBySizeDao,
         utilizationSchemaDefinition);
-    fileSizeCountTask =
-        new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+    fileSizeCountTaskFSO =
+        new FileSizeCountTaskFSO(fileCountBySizeDao, 
utilizationSchemaDefinition);
+    fileSizeCountTaskOBS =
+        new FileSizeCountTaskOBS(fileCountBySizeDao, 
utilizationSchemaDefinition);
     omTableInsightTask =
         new OmTableInsightTask(globalStatsDao, sqlConfiguration,
             reconOMMetadataManager);
@@ -835,7 +841,7 @@ public void testGetFileCounts() throws Exception {
     when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
     when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
 
-    when(omMetadataManager.getKeyTable(BucketLayout.LEGACY)).thenReturn(
+    when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn(
         keyTableLegacy);
     when(omMetadataManager.getKeyTable(
         BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso);
@@ -862,11 +868,37 @@ public void testGetFileCounts() throws Exception {
         .thenReturn(omKeyInfo2)
         .thenReturn(omKeyInfo3);
 
-    Pair<String, Boolean> result =
-        fileSizeCountTask.reprocess(omMetadataManager);
-    assertTrue(result.getRight());
+    // Call reprocess on both endpoints.
+    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+    assertTrue(resultOBS.getRight());
+    assertTrue(resultFSO.getRight());
 
+    // The two tasks should result in 3 rows.
     assertEquals(3, fileCountBySizeDao.count());
+
+    // Verify counts:
+    // For vol1/bucket1 with fileSize 1000L, the upper bound is 1024L and 
expected count is 2.
+    Record3<String, String, Long> recordToFind = dslContext.newRecord(
+            FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(1024L);
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+    // For vol1/bucket1 with fileSize 100000L, the upper bound is 131072L and 
expected count is 2.
+    recordToFind.value3(131072L);
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+    // For vol2/bucket1 with fileSize 1000L, the upper bound is 1024L and 
expected count is 2.
+    recordToFind.value1("vol2");
+    recordToFind.value2("bucket1");
+    recordToFind.value3(1024L);
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+    // --- Now test the query endpoints of the utilization service ---
     Response response = utilizationEndpoint.getFileCounts(null, null, 0);
     List<FileCountBySize> resultSet =
         (List<FileCountBySize>) response.getEntity();
@@ -881,38 +913,38 @@ public void testGetFileCounts() throws Exception {
         o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
         o.getCount() == 2L));
 
-    // Test for "volume" query param
+    // Test for "volume" query param.
     response = utilizationEndpoint.getFileCounts("vol1", null, 0);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(2, resultSet.size());
     assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1")));
 
-    // Test for non-existent volume
+    // Test for non-existent volume.
     response = utilizationEndpoint.getFileCounts("vol", null, 0);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(0, resultSet.size());
 
-    // Test for "volume" + "bucket" query param
+    // Test for "volume" + "bucket" query param.
     response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 0);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(2, resultSet.size());
     assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1") &&
         o.getBucket().equals("bucket1")));
 
-    // Test for non-existent bucket
+    // Test for non-existent bucket.
     response = utilizationEndpoint.getFileCounts("vol1", "bucket", 0);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(0, resultSet.size());
 
-    // Test for "volume" + "bucket" + "fileSize" query params
+    // Test for "volume" + "bucket" + "fileSize" query params.
     response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 131072);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(1, resultSet.size());
     FileCountBySize o = resultSet.get(0);
-    assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals(
-        "bucket1") && o.getFileSize() == 131072);
+    assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals("bucket1") 
&&
+        o.getFileSize() == 131072);
 
-    // Test for non-existent fileSize
+    // Test for non-existent fileSize.
     response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 1310725);
     resultSet = (List<FileCountBySize>) response.getEntity();
     assertEquals(0, resultSet.size());
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 2c22541d02..30ca22154b 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -22,6 +22,7 @@
 import static 
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
 import static 
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.AdditionalAnswers.returnsElementsOf;
 import static org.mockito.BDDMockito.given;
@@ -55,8 +56,10 @@
 public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
 
   private FileCountBySizeDao fileCountBySizeDao;
-  private FileSizeCountTask fileSizeCountTask;
+  private FileSizeCountTaskOBS fileSizeCountTaskOBS;
+  private FileSizeCountTaskFSO fileSizeCountTaskFSO;
   private DSLContext dslContext;
+  private UtilizationSchemaDefinition utilizationSchemaDefinition;
 
   public TestFileSizeCountTask() {
     super();
@@ -65,24 +68,28 @@ public TestFileSizeCountTask() {
   @BeforeEach
   public void setUp() {
     fileCountBySizeDao = getDao(FileCountBySizeDao.class);
-    UtilizationSchemaDefinition utilizationSchemaDefinition =
-        getSchemaDefinition(UtilizationSchemaDefinition.class);
-    fileSizeCountTask =
-        new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+    utilizationSchemaDefinition = 
getSchemaDefinition(UtilizationSchemaDefinition.class);
+    // Create separate task instances.
+    fileSizeCountTaskOBS = new FileSizeCountTaskOBS(fileCountBySizeDao, 
utilizationSchemaDefinition);
+    fileSizeCountTaskFSO = new FileSizeCountTaskFSO(fileCountBySizeDao, 
utilizationSchemaDefinition);
     dslContext = utilizationSchemaDefinition.getDSLContext();
-    // Truncate table before running each test
+    // Truncate table before each test.
     dslContext.truncate(FILE_COUNT_BY_SIZE);
   }
 
   @Test
   public void testReprocess() throws IOException {
+    // Create three sample OmKeyInfo objects.
     OmKeyInfo[] omKeyInfos = new OmKeyInfo[3];
     String[] keyNames = {"key1", "key2", "key3"};
     String[] volumeNames = {"vol1", "vol1", "vol1"};
     String[] bucketNames = {"bucket1", "bucket1", "bucket1"};
+    // Use sizes so that each falls into a distinct bin:
+    //  - 1000L   falls into first bin (upper bound 1024L)
+    //  - 100000L falls into second bin (upper bound 131072L)
+    //  - 4PB (i.e. 1125899906842624L * 4) falls into the highest bin (upper 
bound Long.MAX_VALUE)
     Long[] dataSizes = {1000L, 100000L, 1125899906842624L * 4};
 
-    // Loop to initialize each instance of OmKeyInfo
     for (int i = 0; i < 3; i++) {
       omKeyInfos[i] = mock(OmKeyInfo.class);
       given(omKeyInfos[i].getKeyName()).willReturn(keyNames[i]);
@@ -91,88 +98,82 @@ public void testReprocess() throws IOException {
       given(omKeyInfos[i].getDataSize()).willReturn(dataSizes[i]);
     }
 
-    // Create two mock instances of TypedTable, one for FILE_SYSTEM_OPTIMIZED
-    // layout and one for LEGACY layout
+    // Prepare the OMMetadataManager mock.
     OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
-    TypedTable<String, OmKeyInfo> keyTableLegacy = mock(TypedTable.class);
-    TypedTable<String, OmKeyInfo> keyTableFso = mock(TypedTable.class);
 
-    // Set return values for getKeyTable() for FILE_SYSTEM_OPTIMIZED
-    // and LEGACY layout
-    when(omMetadataManager.getKeyTable(eq(BucketLayout.LEGACY)))
-        .thenReturn(keyTableLegacy);
+    // Configure the OBS (OBJECT_STORE) endpoint.
+    TypedTable<String, OmKeyInfo> keyTableOBS = mock(TypedTable.class);
+    // Note: Even though legacy and OBS share the same underlying table, we 
simulate OBS here.
+    when(omMetadataManager.getKeyTable(eq(BucketLayout.OBJECT_STORE)))
+        .thenReturn(keyTableOBS);
+    TypedTable.TypedTableIterator mockIterOBS = 
mock(TypedTable.TypedTableIterator.class);
+    when(keyTableOBS.iterator()).thenReturn(mockIterOBS);
+    // Simulate three keys then end.
+    when(mockIterOBS.hasNext()).thenReturn(true, true, true, false);
+    TypedTable.TypedKeyValue mockKeyValueOBS = 
mock(TypedTable.TypedKeyValue.class);
+    when(mockIterOBS.next()).thenReturn(mockKeyValueOBS);
+    when(mockKeyValueOBS.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], 
omKeyInfos[2]);
+
+    // Configure the FSO (FILE_SYSTEM_OPTIMIZED) endpoint.
+    TypedTable<String, OmKeyInfo> keyTableFSO = mock(TypedTable.class);
     when(omMetadataManager.getKeyTable(eq(BucketLayout.FILE_SYSTEM_OPTIMIZED)))
-        .thenReturn(keyTableFso);
-
-    // Create two mock instances of TypedTableIterator, one for each
-    // instance of TypedTable
-    TypedTable.TypedTableIterator mockKeyIterLegacy =
-        mock(TypedTable.TypedTableIterator.class);
-    when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
-    // Set return values for hasNext() and next() of the mock instance of
-    // TypedTableIterator for keyTableLegacy
-    when(mockKeyIterLegacy.hasNext()).thenReturn(true, true, true, false);
-    TypedTable.TypedKeyValue mockKeyValueLegacy =
-        mock(TypedTable.TypedKeyValue.class);
-    when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy);
-    when(mockKeyValueLegacy.getValue()).thenReturn(omKeyInfos[0], 
omKeyInfos[1],
-        omKeyInfos[2]);
-
-
-    // Same as above, but for keyTableFso
-    TypedTable.TypedTableIterator mockKeyIterFso =
-        mock(TypedTable.TypedTableIterator.class);
-    when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
-    when(mockKeyIterFso.hasNext()).thenReturn(true, true, true, false);
-    TypedTable.TypedKeyValue mockKeyValueFso =
-        mock(TypedTable.TypedKeyValue.class);
-    when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso);
-    when(mockKeyValueFso.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1],
-        omKeyInfos[2]);
-
-    // Reprocess could be called from table having existing entries. Adding
-    // an entry to simulate that.
-    fileCountBySizeDao.insert(
-        new FileCountBySize("vol1", "bucket1", 1024L, 10L));
-
-    Pair<String, Boolean> result =
-        fileSizeCountTask.reprocess(omMetadataManager);
-
-    // Verify that the result of reprocess is true
-    assertTrue(result.getRight());
-
-    // Verify that the number of entries in fileCountBySizeDao is 3
-    assertEquals(3, fileCountBySizeDao.count());
-
-    // Create a record to find the count of files in a specific volume,
-    // bucket and file size
-    Record3<String, String, Long> recordToFind = dslContext
-        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+        .thenReturn(keyTableFSO);
+    TypedTable.TypedTableIterator mockIterFSO = 
mock(TypedTable.TypedTableIterator.class);
+    when(keyTableFSO.iterator()).thenReturn(mockIterFSO);
+    when(mockIterFSO.hasNext()).thenReturn(true, true, true, false);
+    TypedTable.TypedKeyValue mockKeyValueFSO = 
mock(TypedTable.TypedKeyValue.class);
+    when(mockIterFSO.next()).thenReturn(mockKeyValueFSO);
+    when(mockKeyValueFSO.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], 
omKeyInfos[2]);
+
+    // Simulate a preexisting entry in the DB.
+    // (This record will be removed by the first task via table truncation.)
+    fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L, 
10L));
+
+    // Call reprocess on both tasks.
+    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+
+    // Verify that both tasks reported success.
+    assertTrue(resultOBS.getRight(), "OBS reprocess should return true");
+    assertTrue(resultFSO.getRight(), "FSO reprocess should return true");
+
+    // After processing, there should be 3 rows (one per bin).
+    assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB");
+
+    // Now verify the counts in each bin.
+    // Because each task processes the same 3 keys and each key contributes a 
count of 1,
+    // the final count per bin should be 2 (1 from OBS + 1 from FSO).
+
+    // Verify bin for key size 1000L -> upper bound 1024L.
+    Record3<String, String, Long> recordToFind = dslContext.newRecord(
+            FILE_COUNT_BY_SIZE.VOLUME,
             FILE_COUNT_BY_SIZE.BUCKET,
             FILE_COUNT_BY_SIZE.FILE_SIZE)
         .value1("vol1")
         .value2("bucket1")
         .value3(1024L);
-    assertEquals(2L,
-        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
-    // file size upper bound for 100000L is 131072L (next highest power of 2)
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+        "Expected bin 1024 to have count 2");
+
+    // Verify bin for key size 100000L -> upper bound 131072L.
     recordToFind.value3(131072L);
-    assertEquals(2L,
-        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
-    // file size upper bound for 4PB is Long.MAX_VALUE
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+        "Expected bin 131072 to have count 2");
+
+    // Verify bin for key size 4PB -> upper bound Long.MAX_VALUE.
     recordToFind.value3(Long.MAX_VALUE);
-    assertEquals(2L,
-        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    assertEquals(2L, 
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+        "Expected bin Long.MAX_VALUE to have count 2");
   }
 
   @Test
   public void testProcess() {
-    // Write 2 keys.
+    // First batch: Write 2 keys.
     OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class);
     given(toBeDeletedKey.getVolumeName()).willReturn("vol1");
     given(toBeDeletedKey.getBucketName()).willReturn("bucket1");
     given(toBeDeletedKey.getKeyName()).willReturn("deletedKey");
-    given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1
+    given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Falls in bin 
with upper bound 2048L
     OMDBUpdateEvent event = new OMUpdateEventBuilder()
         .setAction(PUT)
         .setKey("deletedKey")
@@ -184,7 +185,7 @@ public void testProcess() {
     given(toBeUpdatedKey.getVolumeName()).willReturn("vol1");
     given(toBeUpdatedKey.getBucketName()).willReturn("bucket1");
     given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey");
-    given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4
+    given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Falls in bin 
with upper bound 16384L
     OMDBUpdateEvent event2 = new OMUpdateEventBuilder()
         .setAction(PUT)
         .setKey("updatedKey")
@@ -194,9 +195,14 @@ public void testProcess() {
 
     OMUpdateEventBatch omUpdateEventBatch =
         new OMUpdateEventBatch(Arrays.asList(event, event2), 0L);
-    fileSizeCountTask.process(omUpdateEventBatch);
 
-    // Verify 2 keys are in correct bins.
+    // Process the same batch on both endpoints.
+    fileSizeCountTaskOBS.process(omUpdateEventBatch);
+    fileSizeCountTaskFSO.process(omUpdateEventBatch);
+
+    // After processing the first batch:
+    // Since each endpoint processes the same events, the counts are doubled.
+    // Expected: 2 rows (bins 2048 and 16384) with counts 2 each.
     assertEquals(2, fileCountBySizeDao.count());
     Record3<String, String, Long> recordToFind = dslContext
         .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
@@ -212,6 +218,7 @@ public void testProcess() {
     assertEquals(1L,
         fileCountBySizeDao.findById(recordToFind).getCount().longValue());
 
+    // Second batch: Process update events.
     // Add new key.
     OmKeyInfo newKey = mock(OmKeyInfo.class);
     given(newKey.getVolumeName()).willReturn("vol1");
@@ -249,7 +256,8 @@ public void testProcess() {
 
     omUpdateEventBatch = new OMUpdateEventBatch(
         Arrays.asList(updateEvent, putEvent, deleteEvent), 0L);
-    fileSizeCountTask.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch);
+    fileSizeCountTaskFSO.process(omUpdateEventBatch);
 
     assertEquals(4, fileCountBySizeDao.count());
     recordToFind.value3(1024L);
@@ -268,8 +276,7 @@ public void testProcess() {
 
   @Test
   public void testReprocessAtScale() throws IOException {
-    // generate mocks for 2 volumes, 500 buckets each volume
-    // and 42 keys in each bucket.
+    // generate mocks for 2 volumes, 500 buckets each volume, and 42 keys in 
each bucket.
     List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
     List<Boolean> hasNextAnswer = new ArrayList<>();
     for (int volIndex = 1; volIndex <= 2; volIndex++) {
@@ -279,8 +286,8 @@ public void testReprocessAtScale() throws IOException {
           given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
           given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
           given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
-          // Place keys in each bin
-          long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+          // Each key's fileSize = 2^(keyIndex+9) - 1, so that it falls into 
its respective bin.
+          long fileSize = (long) Math.pow(2, keyIndex + 9) - 1L;
           given(omKeyInfo.getDataSize()).willReturn(fileSize);
           omKeyInfoList.add(omKeyInfo);
           hasNextAnswer.add(true);
@@ -290,46 +297,43 @@ public void testReprocessAtScale() throws IOException {
     hasNextAnswer.add(false);
 
     OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+    // Create two mock key tables: one for OBS (using LEGACY in this test) and 
one for FSO.
     TypedTable<String, OmKeyInfo> keyTableLegacy = mock(TypedTable.class);
     TypedTable<String, OmKeyInfo> keyTableFso = mock(TypedTable.class);
 
-    TypedTable.TypedTableIterator mockKeyIterLegacy = mock(TypedTable
-        .TypedTableIterator.class);
-    TypedTable.TypedTableIterator mockKeyIterFso = mock(TypedTable
-        .TypedTableIterator.class);
-    TypedTable.TypedKeyValue mockKeyValueLegacy = mock(
-        TypedTable.TypedKeyValue.class);
-    TypedTable.TypedKeyValue mockKeyValueFso = mock(
-        TypedTable.TypedKeyValue.class);
+    TypedTable.TypedTableIterator mockKeyIterLegacy = 
mock(TypedTable.TypedTableIterator.class);
+    TypedTable.TypedTableIterator mockKeyIterFso = 
mock(TypedTable.TypedTableIterator.class);
+    TypedTable.TypedKeyValue mockKeyValueLegacy = 
mock(TypedTable.TypedKeyValue.class);
+    TypedTable.TypedKeyValue mockKeyValueFso = 
mock(TypedTable.TypedKeyValue.class);
 
     when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
     when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
 
-    when(omMetadataManager.getKeyTable(BucketLayout.LEGACY))
-        .thenReturn(keyTableLegacy);
-    when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED))
-        .thenReturn(keyTableFso);
+    // In this test, assume OBS task uses BucketLayout.LEGACY and FSO uses 
FILE_SYSTEM_OPTIMIZED.
+    
when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn(keyTableLegacy);
+    
when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso);
 
-    when(mockKeyIterLegacy.hasNext())
-        .thenAnswer(returnsElementsOf(hasNextAnswer));
-    when(mockKeyIterFso.hasNext())
-        .thenAnswer(returnsElementsOf(hasNextAnswer));
+    
when(mockKeyIterLegacy.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer));
+    
when(mockKeyIterFso.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer));
     when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy);
     when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso);
 
-    when(mockKeyValueLegacy.getValue())
-        .thenAnswer(returnsElementsOf(omKeyInfoList));
-    when(mockKeyValueFso.getValue())
-        .thenAnswer(returnsElementsOf(omKeyInfoList));
+    
when(mockKeyValueLegacy.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
+    
when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
 
-    Pair<String, Boolean> result =
-        fileSizeCountTask.reprocess(omMetadataManager);
-    assertTrue(result.getRight());
+    // Call reprocess on both endpoints.
+    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+    assertTrue(resultOBS.getRight());
+    assertTrue(resultFSO.getRight());
 
     // 2 volumes * 500 buckets * 42 bins = 42000 rows
     assertEquals(42000, fileCountBySizeDao.count());
-    Record3<String, String, Long> recordToFind = dslContext
-        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+
+    // Verify counts for a few representative bins.
+    // For volume "vol1", bucket "bucket1", the first bin (upper bound 1024L) 
should have a count of 2.
+    Record3<String, String, Long> recordToFind = dslContext.newRecord(
+            FILE_COUNT_BY_SIZE.VOLUME,
             FILE_COUNT_BY_SIZE.BUCKET,
             FILE_COUNT_BY_SIZE.FILE_SIZE)
         .value1("vol1")
@@ -337,11 +341,13 @@ public void testReprocessAtScale() throws IOException {
         .value3(1024L);
     assertEquals(2L,
         fileCountBySizeDao.findById(recordToFind).getCount().longValue());
-    // file size upper bound for 100000L is 131072L (next highest power of 2)
-    recordToFind.value1("vol1");
+
+    // For volume "vol1", bucket "bucket1", the bin with upper bound 131072L 
should have a count of 2.
     recordToFind.value3(131072L);
     assertEquals(2L,
         fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+    // For volume "vol1", bucket "bucket500", the highest bin (upper bound 
Long.MAX_VALUE) should have a count of 2.
     recordToFind.value2("bucket500");
     recordToFind.value3(Long.MAX_VALUE);
     assertEquals(2L,
@@ -385,9 +391,10 @@ public void testProcessAtScale() {
       }
     }
 
-    OMUpdateEventBatch omUpdateEventBatch =
-        new OMUpdateEventBatch(omDbEventList, 0L);
-    fileSizeCountTask.process(omUpdateEventBatch);
+    OMUpdateEventBatch omUpdateEventBatch = new 
OMUpdateEventBatch(omDbEventList, 0L);
+    // Process the same batch on both endpoints.
+    fileSizeCountTaskOBS.process(omUpdateEventBatch);
+    fileSizeCountTaskFSO.process(omUpdateEventBatch);
 
     // Verify 2 keys are in correct bins.
     assertEquals(10000, fileCountBySizeDao.count());
@@ -463,7 +470,8 @@ public void testProcessAtScale() {
     }
 
     omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L);
-    fileSizeCountTask.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch);
+    fileSizeCountTaskFSO.process(omUpdateEventBatch);
 
     assertEquals(10000, fileCountBySizeDao.count());
     recordToFind = dslContext
@@ -488,4 +496,34 @@ public void testProcessAtScale() {
         .getCount().longValue());
   }
 
+
+  @Test
+  public void testTruncateTableExceptionPropagation() {
+    // Mock DSLContext and FileCountBySizeDao
+    DSLContext mockDslContext = mock(DSLContext.class);
+    FileCountBySizeDao mockDao = mock(FileCountBySizeDao.class);
+
+    // Mock schema definition and ensure it returns our mocked DSLContext
+    UtilizationSchemaDefinition mockSchema = 
mock(UtilizationSchemaDefinition.class);
+    when(mockSchema.getDSLContext()).thenReturn(mockDslContext);
+
+    // Mock delete operation to throw an exception
+    when(mockDslContext.delete(FILE_COUNT_BY_SIZE))
+        .thenThrow(new RuntimeException("Simulated DB failure"));
+
+    // Create instances of FileSizeCountTaskOBS and FileSizeCountTaskFSO using 
mocks
+    fileSizeCountTaskOBS = new FileSizeCountTaskOBS(mockDao, mockSchema);
+    fileSizeCountTaskFSO = new FileSizeCountTaskFSO(mockDao, mockSchema);
+
+    // Mock OMMetadataManager
+    OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+
+    // Verify that an exception is thrown from reprocess() for both tasks.
+    assertThrows(RuntimeException.class, () -> 
fileSizeCountTaskOBS.reprocess(omMetadataManager),
+        "Expected reprocess to propagate exception but it didn't.");
+
+    assertThrows(RuntimeException.class, () -> 
fileSizeCountTaskFSO.reprocess(omMetadataManager),
+        "Expected reprocess to propagate exception but it didn't.");
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to