This is an automated email from the ASF dual-hosted git repository.

arafat2198 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 393211a6e8 HDDS-12062. Recon - Error handling in NSSummaryTask to 
avoid data inconsistencies. (#7723)
393211a6e8 is described below

commit 393211a6e8b4a591c729f3dd2a23320baa60f132
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Mon Feb 24 18:10:24 2025 +0530

    HDDS-12062. Recon - Error handling in NSSummaryTask to avoid data 
inconsistencies. (#7723)
---
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |   2 +
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  |  21 +-
 .../ozone/recon/tasks/FileSizeCountTask.java       | 333 ---------------------
 .../ozone/recon/tasks/FileSizeCountTaskFSO.java    |   6 +-
 .../ozone/recon/tasks/FileSizeCountTaskHelper.java |  37 ++-
 .../ozone/recon/tasks/FileSizeCountTaskOBS.java    |   6 +-
 .../hadoop/ozone/recon/tasks/NSSummaryTask.java    | 102 +++++--
 .../recon/tasks/NSSummaryTaskDbEventHandler.java   |  20 +-
 .../ozone/recon/tasks/NSSummaryTaskWithFSO.java    |  45 ++-
 .../ozone/recon/tasks/NSSummaryTaskWithLegacy.java |  41 ++-
 .../ozone/recon/tasks/NSSummaryTaskWithOBS.java    |  44 ++-
 .../ozone/recon/tasks/OmTableInsightTask.java      |  20 +-
 .../hadoop/ozone/recon/tasks/ReconOmTask.java      | 110 ++++++-
 .../ozone/recon/tasks/ReconTaskControllerImpl.java |  68 +++--
 .../ozone/recon/api/TestContainerEndpoint.java     |   4 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |  14 +-
 .../recon/api/TestNSSummaryDiskUsageOrdering.java  |   2 +-
 .../recon/api/TestNSSummaryEndpointWithFSO.java    |   2 +-
 .../recon/api/TestNSSummaryEndpointWithLegacy.java |   2 +-
 .../api/TestNSSummaryEndpointWithOBSAndLegacy.java |   4 +-
 .../ozone/recon/api/TestOmDBInsightEndPoint.java   |   6 +-
 .../recon/api/TestOpenKeysSearchEndpoint.java      |   2 +-
 .../hadoop/ozone/recon/tasks/DummyReconDBTask.java |  16 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |   6 +-
 .../ozone/recon/tasks/TestFileSizeCountTask.java   |  34 +--
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   3 +-
 .../recon/tasks/TestNSSummaryTaskWithFSO.java      | 108 ++++++-
 .../recon/tasks/TestNSSummaryTaskWithLegacy.java   |  13 +-
 .../TestNSSummaryTaskWithLegacyOBSLayout.java      |  10 +-
 .../recon/tasks/TestNSSummaryTaskWithOBS.java      |  16 +-
 .../ozone/recon/tasks/TestOmTableInsightTask.java  |  42 +--
 .../recon/tasks/TestReconTaskControllerImpl.java   |  14 +-
 32 files changed, 574 insertions(+), 579 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index abe85e4763..a096268337 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -501,7 +501,9 @@ ImmutablePair<Boolean, Long> 
innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc
       }
       for (byte[] data : dbUpdates.getData()) {
         try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(data)) {
+          // Events gets populated in events list in OMDBUpdatesHandler with 
call back for put/delete/update
           writeBatch.iterate(omdbUpdatesHandler);
+          // Commit the OM DB transactions in recon rocks DB and sync here.
           try (RDBBatchOperation rdbBatchOperation =
                    new RDBBatchOperation(writeBatch)) {
             try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) {
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index 6e6390d324..e42e021b9e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -34,8 +34,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -82,7 +80,7 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager
    * (container, key) -&gt; count to Recon Container DB.
    */
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     long omKeyCount = 0;
 
     // In-memory maps for fast look up and batch write
@@ -118,7 +116,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
                 containerKeyCountMap);
             if (!checkAndCallFlushToDB(containerKeyMap)) {
               LOG.error("Unable to flush containerKey information to the DB");
-              return new ImmutablePair<>(getTaskName(), false);
+              return buildTaskResult(false);
             }
             omKeyCount++;
           }
@@ -131,7 +129,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
           containerKeyCountMap)) {
         LOG.error("Unable to flush Container Key Count and " +
             "remaining Container Key information to the DB");
-        return new ImmutablePair<>(getTaskName(), false);
+        return buildTaskResult(false);
       }
 
       LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask.");
@@ -142,9 +140,9 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
     } catch (IOException ioEx) {
       LOG.error("Unable to populate Container Key data in Recon DB. ",
           ioEx);
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     }
-    return new ImmutablePair<>(getTaskName(), true);
+    return buildTaskResult(true);
   }
 
   private boolean flushAndCommitContainerKeyInfoToDB(
@@ -189,7 +187,8 @@ public Collection<String> getTaskTables() {
   }
 
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public TaskResult process(OMUpdateEventBatch events,
+                            Map<String, Integer> subTaskSeekPosMap) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
     int eventCount = 0;
     final Collection<String> taskTables = getTaskTables();
@@ -246,18 +245,18 @@ public Pair<String, Boolean> process(OMUpdateEventBatch 
events) {
       } catch (IOException e) {
         LOG.error("Unexpected exception while updating key data : {} ",
             updatedKey, e);
-        return new ImmutablePair<>(getTaskName(), false);
+        return buildTaskResult(false);
       }
     }
     try {
       writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList);
     } catch (IOException e) {
       LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     }
     LOG.debug("{} successfully processed {} OM DB update event(s) in {} 
milliseconds.",
         getTaskName(), eventCount, (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(getTaskName(), true);
+    return buildTaskResult(true);
   }
 
   private void writeToTheDB(Map<ContainerKeyPrefix, Integer> containerKeyMap,
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
deleted file mode 100644
index 67da0d6c78..0000000000
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.tasks;
-
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static 
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
-
-import com.google.inject.Inject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.recon.ReconUtils;
-import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
-import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
-import org.jooq.DSLContext;
-import org.jooq.Record3;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class to iterate over the OM DB and store the counts of existing/new
- * files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
- * fileSize DB.
- */
-public class FileSizeCountTask implements ReconOmTask {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(FileSizeCountTask.class);
-
-  private FileCountBySizeDao fileCountBySizeDao;
-  private DSLContext dslContext;
-
-  @Inject
-  public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
-                           UtilizationSchemaDefinition
-                               utilizationSchemaDefinition) {
-    this.fileCountBySizeDao = fileCountBySizeDao;
-    this.dslContext = utilizationSchemaDefinition.getDSLContext();
-  }
-
-  /**
-   * Read the Keys from OM snapshot DB and calculate the upper bound of
-   * File Size it belongs to.
-   *
-   * @param omMetadataManager OM Metadata instance.
-   * @return Pair
-   */
-  @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
-    // Map to store the count of files based on file size
-    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
-
-    // Delete all records from FILE_COUNT_BY_SIZE table
-    int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
-    LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
-
-    // Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout
-    boolean statusFSO =
-        reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED,
-            omMetadataManager,
-            fileSizeCountMap);
-    // Call reprocessBucket method for LEGACY bucket layout
-    boolean statusOBS =
-        reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager,
-            fileSizeCountMap);
-    if (!statusFSO && !statusOBS) {
-      return new ImmutablePair<>(getTaskName(), false);
-    }
-    writeCountsToDB(fileSizeCountMap);
-    LOG.debug("Completed a 'reprocess' run of FileSizeCountTask.");
-    return new ImmutablePair<>(getTaskName(), true);
-  }
-
-  private boolean reprocessBucketLayout(BucketLayout bucketLayout,
-                               OMMetadataManager omMetadataManager,
-                               Map<FileSizeCountKey, Long> fileSizeCountMap) {
-    Table<String, OmKeyInfo> omKeyInfoTable =
-        omMetadataManager.getKeyTable(bucketLayout);
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
-             keyIter = omKeyInfoTable.iterator()) {
-      while (keyIter.hasNext()) {
-        Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-        handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
-        //  The time complexity of .size() method is constant time, O(1)
-        if (fileSizeCountMap.size() >= 100000) {
-          writeCountsToDB(fileSizeCountMap);
-          fileSizeCountMap.clear();
-        }
-      }
-    } catch (IOException ioEx) {
-      LOG.error("Unable to populate File Size Count for " + bucketLayout +
-          " in Recon DB. ", ioEx);
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public String getTaskName() {
-    return "FileSizeCountTask";
-  }
-
-  public Collection<String> getTaskTables() {
-    List<String> taskTables = new ArrayList<>();
-    taskTables.add(KEY_TABLE);
-    taskTables.add(FILE_TABLE);
-    return taskTables;
-  }
-
-  /**
-   * Read the Keys from update events and update the count of files
-   * pertaining to a certain upper bound.
-   *
-   * @param events Update events - PUT/DELETE.
-   * @return Pair
-   */
-  @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
-    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
-    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
-    final Collection<String> taskTables = getTaskTables();
-
-    long startTime = System.currentTimeMillis();
-    while (eventIterator.hasNext()) {
-      OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
-      // Filter event inside process method to avoid duping
-      if (!taskTables.contains(omdbUpdateEvent.getTable())) {
-        continue;
-      }
-      String updatedKey = omdbUpdateEvent.getKey();
-      Object value = omdbUpdateEvent.getValue();
-      Object oldValue = omdbUpdateEvent.getOldValue();
-
-      if (value instanceof OmKeyInfo) {
-        OmKeyInfo omKeyInfo = (OmKeyInfo) value;
-        OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue;
-
-        try {
-          switch (omdbUpdateEvent.getAction()) {
-          case PUT:
-            handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
-            break;
-
-          case DELETE:
-            handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
-            break;
-
-          case UPDATE:
-            if (omKeyInfoOld != null) {
-              handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap);
-              handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
-            } else {
-              LOG.warn("Update event does not have the old keyInfo for {}.",
-                  updatedKey);
-            }
-            break;
-
-          default:
-            LOG.trace("Skipping DB update event : {}",
-                omdbUpdateEvent.getAction());
-          }
-        } catch (Exception e) {
-          LOG.error("Unexpected exception while processing key {}.",
-              updatedKey, e);
-          return new ImmutablePair<>(getTaskName(), false);
-        }
-      } else {
-        LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
-            value.getClass().getName(), updatedKey);
-      }
-    }
-    writeCountsToDB(fileSizeCountMap);
-    LOG.debug("{} successfully processed in {} milliseconds",
-        getTaskName(), (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(getTaskName(), true);
-  }
-
-  /**
-   * Populate DB with the counts of file sizes calculated
-   * using the dao.
-   *
-   */
-  private void writeCountsToDB(Map<FileSizeCountKey, Long> fileSizeCountMap) {
-
-    List<FileCountBySize> insertToDb = new ArrayList<>();
-    List<FileCountBySize> updateInDb = new ArrayList<>();
-    boolean isDbTruncated = isFileCountBySizeTableEmpty(); // Check if table 
is empty
-
-    fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
-      FileCountBySize newRecord = new FileCountBySize();
-      newRecord.setVolume(key.volume);
-      newRecord.setBucket(key.bucket);
-      newRecord.setFileSize(key.fileSizeUpperBound);
-      newRecord.setCount(fileSizeCountMap.get(key));
-      if (!isDbTruncated) {
-        // Get the current count from database and update
-        Record3<String, String, Long> recordToFind =
-            dslContext.newRecord(
-                FILE_COUNT_BY_SIZE.VOLUME,
-                FILE_COUNT_BY_SIZE.BUCKET,
-                FILE_COUNT_BY_SIZE.FILE_SIZE)
-                .value1(key.volume)
-                .value2(key.bucket)
-                .value3(key.fileSizeUpperBound);
-        FileCountBySize fileCountRecord =
-            fileCountBySizeDao.findById(recordToFind);
-        if (fileCountRecord == null && newRecord.getCount() > 0L) {
-          // insert new row only for non-zero counts.
-          insertToDb.add(newRecord);
-        } else if (fileCountRecord != null) {
-          newRecord.setCount(fileCountRecord.getCount() +
-              fileSizeCountMap.get(key));
-          updateInDb.add(newRecord);
-        }
-      } else if (newRecord.getCount() > 0) {
-        // insert new row only for non-zero counts.
-        insertToDb.add(newRecord);
-      }
-    });
-    fileCountBySizeDao.insert(insertToDb);
-    fileCountBySizeDao.update(updateInDb);
-  }
-
-  private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
-    return new FileSizeCountKey(omKeyInfo.getVolumeName(),
-        omKeyInfo.getBucketName(),
-            ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize()));
-  }
-
-  /**
-   * Calculate and update the count of files being tracked by
-   * fileSizeCountMap.
-   * Used by reprocess() and process().
-   *
-   * @param omKeyInfo OmKey being updated for count
-   */
-  private void handlePutKeyEvent(OmKeyInfo omKeyInfo,
-                                 Map<FileSizeCountKey, Long> fileSizeCountMap) 
{
-    FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
-    Long count = fileSizeCountMap.containsKey(key) ?
-        fileSizeCountMap.get(key) + 1L : 1L;
-    fileSizeCountMap.put(key, count);
-  }
-
-  private BucketLayout getBucketLayout() {
-    return BucketLayout.DEFAULT;
-  }
-
-  /**
-   * Calculate and update the count of files being tracked by
-   * fileSizeCountMap.
-   * Used by reprocess() and process().
-   *
-   * @param omKeyInfo OmKey being updated for count
-   */
-  private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
-                                    Map<FileSizeCountKey, Long>
-                                        fileSizeCountMap) {
-    if (omKeyInfo == null) {
-      LOG.warn("Deleting a key not found while handling DELETE key event. Key" 
+
-          " not found in Recon OM DB : {}", key);
-    } else {
-      FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
-      Long count = fileSizeCountMap.containsKey(countKey) ?
-          fileSizeCountMap.get(countKey) - 1L : -1L;
-      fileSizeCountMap.put(countKey, count);
-    }
-  }
-
-  /**
-   * Checks if the FILE_COUNT_BY_SIZE table is empty.
-   *
-   * @return true if the table is empty, false otherwise.
-   */
-  private boolean isFileCountBySizeTableEmpty() {
-    return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
-  }
-
-  private static class FileSizeCountKey {
-    private String volume;
-    private String bucket;
-    private Long fileSizeUpperBound;
-
-    FileSizeCountKey(String volume, String bucket,
-                     Long fileSizeUpperBound) {
-      this.volume = volume;
-      this.bucket = bucket;
-      this.fileSizeUpperBound = fileSizeUpperBound;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof FileSizeCountKey) {
-        FileSizeCountKey s = (FileSizeCountKey) obj;
-        return volume.equals(s.volume) && bucket.equals(s.bucket) &&
-            fileSizeUpperBound.equals(s.fileSizeUpperBound);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return (volume  + bucket + fileSizeUpperBound).hashCode();
-    }
-  }
-}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
index f40a859590..a411444780 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.recon.tasks;
 
 import com.google.inject.Inject;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -42,7 +42,7 @@ public FileSizeCountTaskFSO(FileCountBySizeDao 
fileCountBySizeDao,
   }
 
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     return FileSizeCountTaskHelper.reprocess(
         omMetadataManager,
         dslContext,
@@ -53,7 +53,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
   }
 
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public TaskResult process(OMUpdateEventBatch events, Map<String, Integer> 
subTaskSeekPosMap) {
     // This task listens only on the FILE_TABLE.
     return FileSizeCountTaskHelper.processEvents(
         events,
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
index 489449d6a9..406ad2e953 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
@@ -25,8 +25,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -85,11 +83,11 @@ public static void truncateTableIfNeeded(DSLContext 
dslContext) {
    * @param taskName           The name of the task for logging.
    * @return A Pair of task name and boolean indicating success.
    */
-  public static Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager,
-                                                DSLContext dslContext,
-                                                FileCountBySizeDao 
fileCountBySizeDao,
-                                                BucketLayout bucketLayout,
-                                                String taskName) {
+  public static ReconOmTask.TaskResult reprocess(OMMetadataManager 
omMetadataManager,
+                                                 DSLContext dslContext,
+                                                 FileCountBySizeDao 
fileCountBySizeDao,
+                                                 BucketLayout bucketLayout,
+                                                 String taskName) {
     LOG.info("Starting Reprocess for {}", taskName);
     Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
     long startTime = System.currentTimeMillis();
@@ -97,12 +95,12 @@ public static Pair<String, Boolean> 
reprocess(OMMetadataManager omMetadataManage
     boolean status = reprocessBucketLayout(
         bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, 
fileCountBySizeDao, taskName);
     if (!status) {
-      return new ImmutablePair<>(taskName, false);
+      return buildTaskResult(taskName, false);
     }
     writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
     long endTime = System.currentTimeMillis();
     LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - 
startTime));
-    return new ImmutablePair<>(taskName, true);
+    return buildTaskResult(taskName, true);
   }
 
   /**
@@ -155,11 +153,11 @@ public static boolean reprocessBucketLayout(BucketLayout 
bucketLayout,
    * @param taskName           The name of the task for logging.
    * @return A Pair of task name and boolean indicating success.
    */
-  public static Pair<String, Boolean> processEvents(OMUpdateEventBatch events,
-                                                    String tableName,
-                                                    DSLContext dslContext,
-                                                    FileCountBySizeDao 
fileCountBySizeDao,
-                                                    String taskName) {
+  public static ReconOmTask.TaskResult processEvents(OMUpdateEventBatch events,
+                                                     String tableName,
+                                                     DSLContext dslContext,
+                                                     FileCountBySizeDao 
fileCountBySizeDao,
+                                                     String taskName) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
     Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
     long startTime = System.currentTimeMillis();
@@ -195,7 +193,7 @@ public static Pair<String, Boolean> 
processEvents(OMUpdateEventBatch events,
           }
         } catch (Exception e) {
           LOG.error("Unexpected exception while processing key {}.", 
updatedKey, e);
-          return new ImmutablePair<>(taskName, false);
+          return buildTaskResult(taskName, false);
         }
       } else {
         LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
@@ -205,7 +203,7 @@ public static Pair<String, Boolean> 
processEvents(OMUpdateEventBatch events,
     writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
     LOG.debug("{} successfully processed in {} milliseconds", taskName,
         (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(taskName, true);
+    return buildTaskResult(taskName, true);
   }
 
   /**
@@ -328,4 +326,11 @@ public int hashCode() {
       return (volume + bucket + fileSizeUpperBound).hashCode();
     }
   }
+
+  public static ReconOmTask.TaskResult buildTaskResult(String taskName, 
boolean success) {
+    return new ReconOmTask.TaskResult.Builder()
+        .setTaskName(taskName)
+        .setTaskSuccess(success)
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
index acaab763ac..05cd0e1669 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.recon.tasks;
 
 import com.google.inject.Inject;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -42,7 +42,7 @@ public FileSizeCountTaskOBS(FileCountBySizeDao 
fileCountBySizeDao,
   }
 
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     return FileSizeCountTaskHelper.reprocess(
         omMetadataManager,
         dslContext,
@@ -53,7 +53,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
   }
 
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public TaskResult process(OMUpdateEventBatch events, Map<String, Integer> 
subTaskSeekPosMap) {
     // This task listens only on the KEY_TABLE.
     return FileSizeCountTaskHelper.processEvents(
         events,
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
index 2150800350..aa2a94caf1 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
@@ -17,11 +17,16 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -30,7 +35,6 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -39,6 +43,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Task to query data from OMDB and write into Recon RocksDB.
  * Reprocess() will take a snapshots on OMDB, and iterate the keyTable,
@@ -69,7 +74,6 @@ public class NSSummaryTask implements ReconOmTask {
   private final NSSummaryTaskWithFSO nsSummaryTaskWithFSO;
   private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy;
   private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS;
-  private final OzoneConfiguration ozoneConfiguration;
 
   @Inject
   public NSSummaryTask(ReconNamespaceSummaryManager
@@ -80,16 +84,19 @@ public NSSummaryTask(ReconNamespaceSummaryManager
                        ozoneConfiguration) {
     this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
     this.reconOMMetadataManager = reconOMMetadataManager;
-    this.ozoneConfiguration = ozoneConfiguration;
+    long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
+
     this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
     this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
     this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
   }
 
   @Override
@@ -97,28 +104,68 @@ public String getTaskName() {
     return "NSSummaryTask";
   }
 
+  /**
+   * Bucket Type Enum which mimic subtasks for their data processing.
+   */
+  public enum BucketType {
+    FSO("File System Optimized Bucket"),
+    OBS("Object Store Bucket"),
+    LEGACY("Legacy Bucket");
+
+    private final String description;
+
+    BucketType(String description) {
+      this.description = description;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+  }
+
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
-    long startTime = System.currentTimeMillis();
-    boolean success = nsSummaryTaskWithFSO.processWithFSO(events);
-    if (!success) {
+  public TaskResult process(
+      OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
+    boolean anyFailure = false; // Track if any bucket fails
+    Map<String, Integer> updatedSeekPositions = new HashMap<>();
+
+    // Process FSO bucket
+    Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 
0);
+    Pair<Integer, Boolean> bucketResult = 
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
+    updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithFSO failed.");
+      anyFailure = true;
     }
-    success = nsSummaryTaskWithLegacy.processWithLegacy(events);
-    if (!success) {
+
+    // Process Legacy bucket
+    bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
+    bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events, 
bucketSeek);
+    updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithLegacy failed.");
+      anyFailure = true;
     }
-    success = nsSummaryTaskWithOBS.processWithOBS(events);
-    if (!success) {
+
+    // Process OBS bucket
+    bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+    bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
+    updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithOBS failed.");
+      anyFailure = true;
     }
-    LOG.debug("{} successfully processed in {} milliseconds",
-        getTaskName(), (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(getTaskName(), success);
+
+    // Return task failure if any bucket failed, while keeping each bucket's 
latest seek position
+    return new TaskResult.Builder()
+        .setTaskName(getTaskName())
+        .setSubTaskSeekPositions(updatedSeekPositions)
+        .setTaskSuccess(!anyFailure)
+        .build();
   }
 
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     // Initialize a list of tasks to run in parallel
     Collection<Callable<Boolean>> tasks = new ArrayList<>();
 
@@ -130,7 +177,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
     } catch (IOException ioEx) {
       LOG.error("Unable to clear NSSummary table in Recon DB. ",
           ioEx);
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     }
 
     tasks.add(() -> nsSummaryTaskWithFSO
@@ -150,15 +197,12 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
       results = executorService.invokeAll(tasks);
       for (int i = 0; i < results.size(); i++) {
         if (results.get(i).get().equals(false)) {
-          return new ImmutablePair<>(getTaskName(), false);
+          return buildTaskResult(false);
         }
       }
-    } catch (InterruptedException ex) {
+    } catch (InterruptedException | ExecutionException ex) {
       LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex);
-      return new ImmutablePair<>(getTaskName(), false);
-    } catch (ExecutionException ex2) {
-      LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex2);
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     } finally {
       executorService.shutdown();
 
@@ -171,7 +215,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
       LOG.debug("Task execution time: {} milliseconds", durationInMillis);
     }
 
-    return new ImmutablePair<>(getTaskName(), true);
+    return buildTaskResult(true);
   }
 
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
index da223665c3..4b0b851490 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
@@ -71,15 +71,16 @@ public ReconOMMetadataManager getReconOMMetadataManager() {
   protected void writeNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap)
       throws IOException {
     try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
-      nsSummaryMap.keySet().forEach((Long key) -> {
+      for (Map.Entry<Long, NSSummary> entry : nsSummaryMap.entrySet()) {
         try {
           reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation,
-              key, nsSummaryMap.get(key));
+              entry.getKey(), entry.getValue());
         } catch (IOException e) {
           LOG.error("Unable to write Namespace Summary data in Recon DB.",
               e);
+          throw e;
         }
-      });
+      }
       reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
     }
   }
@@ -201,20 +202,11 @@ protected void handleDeleteDirEvent(OmDirectoryInfo 
directoryInfo,
   protected boolean flushAndCommitNSToDB(Map<Long, NSSummary> nsSummaryMap) {
     try {
       writeNSSummariesToDB(nsSummaryMap);
-      nsSummaryMap.clear();
     } catch (IOException e) {
       LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
       return false;
-    }
-    return true;
-  }
-
-  protected boolean checkAndCallFlushToDB(
-      Map<Long, NSSummary> nsSummaryMap) {
-    // if map contains more than entries, flush to DB and clear the map
-    if (null != nsSummaryMap && nsSummaryMap.size() >=
-        nsSummaryFlushToDBMaxThreshold) {
-      return flushAndCommitNSToDB(nsSummaryMap);
+    } finally {
+      nsSummaryMap.clear();
     }
     return true;
   }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
index 030574e0d6..6ebc36331a 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
@@ -26,6 +26,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -47,14 +49,18 @@ public class NSSummaryTaskWithFSO extends 
NSSummaryTaskDbEventHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(NSSummaryTaskWithFSO.class);
 
+  private final long nsSummaryFlushToDBMaxThreshold;
+
   public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager
                               reconNamespaceSummaryManager,
                               ReconOMMetadataManager
                               reconOMMetadataManager,
                               OzoneConfiguration
-                              ozoneConfiguration) {
+                              ozoneConfiguration,
+                              long nsSummaryFlushToDBMaxThreshold) {
     super(reconNamespaceSummaryManager,
         reconOMMetadataManager, ozoneConfiguration);
+    this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
   }
 
   // We only listen to updates from FSO-enabled KeyTable(FileTable) and 
DirTable
@@ -62,15 +68,23 @@ public Collection<String> getTaskTables() {
     return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE);
   }
 
-  public boolean processWithFSO(OMUpdateEventBatch events) {
+  public Pair<Integer, Boolean> processWithFSO(OMUpdateEventBatch events,
+                                               int seekPos) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    int itrPos = 0;
+    while (eventIterator.hasNext() && itrPos < seekPos) {
+      eventIterator.next();
+      itrPos++;
+    }
     final Collection<String> taskTables = getTaskTables();
     Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
+    int eventCounter = 0;
 
     while (eventIterator.hasNext()) {
       OMDBUpdateEvent<String, ? extends
               WithParentObjectId> omdbUpdateEvent = eventIterator.next();
       OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+      eventCounter++;
 
       // we only process updates on OM's FileTable and Dirtable
       String table = omdbUpdateEvent.getTable();
@@ -149,20 +163,23 @@ public boolean processWithFSO(OMUpdateEventBatch events) {
       } catch (IOException ioEx) {
         LOG.error("Unable to process Namespace Summary data in Recon DB. ",
                 ioEx);
-        return false;
+        nsSummaryMap.clear();
+        return new ImmutablePair<>(seekPos, false);
       }
-      if (!checkAndCallFlushToDB(nsSummaryMap)) {
-        return false;
+      if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+        if (!flushAndCommitNSToDB(nsSummaryMap)) {
+          return new ImmutablePair<>(seekPos, false);
+        }
+        seekPos = eventCounter + 1;
       }
     }
 
     // flush and commit left out entries at end
     if (!flushAndCommitNSToDB(nsSummaryMap)) {
-      return false;
+      return new ImmutablePair<>(seekPos, false);
     }
-
     LOG.debug("Completed a process run of NSSummaryTaskWithFSO");
-    return true;
+    return new ImmutablePair<>(seekPos, true);
   }
 
   public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) {
@@ -178,8 +195,10 @@ public boolean reprocessWithFSO(OMMetadataManager 
omMetadataManager) {
           Table.KeyValue<String, OmDirectoryInfo> kv = dirTableIter.next();
           OmDirectoryInfo directoryInfo = kv.getValue();
           handlePutDirEvent(directoryInfo, nsSummaryMap);
-          if (!checkAndCallFlushToDB(nsSummaryMap)) {
-            return false;
+          if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+            if (!flushAndCommitNSToDB(nsSummaryMap)) {
+              return false;
+            }
           }
         }
       }
@@ -194,8 +213,10 @@ public boolean reprocessWithFSO(OMMetadataManager 
omMetadataManager) {
           Table.KeyValue<String, OmKeyInfo> kv = keyTableIter.next();
           OmKeyInfo keyInfo = kv.getValue();
           handlePutKeyEvent(keyInfo, nsSummaryMap);
-          if (!checkAndCallFlushToDB(nsSummaryMap)) {
-            return false;
+          if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+            if (!flushAndCommitNSToDB(nsSummaryMap)) {
+              return false;
+            }
           }
         }
       }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
index cf29f23813..a146003917 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
@@ -25,6 +25,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -51,24 +53,35 @@ public class NSSummaryTaskWithLegacy extends 
NSSummaryTaskDbEventHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(NSSummaryTaskWithLegacy.class);
 
-  private boolean enableFileSystemPaths;
+  private final boolean enableFileSystemPaths;
+  private final long nsSummaryFlushToDBMaxThreshold;
 
   public NSSummaryTaskWithLegacy(ReconNamespaceSummaryManager
                                  reconNamespaceSummaryManager,
                                  ReconOMMetadataManager
                                  reconOMMetadataManager,
                                  OzoneConfiguration
-                                 ozoneConfiguration) {
+                                 ozoneConfiguration,
+                                 long nsSummaryFlushToDBMaxThreshold) {
     super(reconNamespaceSummaryManager,
         reconOMMetadataManager, ozoneConfiguration);
     // true if FileSystemPaths enabled
     enableFileSystemPaths = ozoneConfiguration
         .getBoolean(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS,
             OmConfig.Defaults.ENABLE_FILESYSTEM_PATHS);
+    this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
   }
 
-  public boolean processWithLegacy(OMUpdateEventBatch events) {
+  public Pair<Integer, Boolean> processWithLegacy(OMUpdateEventBatch events,
+                                                  int seekPos) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    int itrPos = 0;
+    while (eventIterator.hasNext() && itrPos < seekPos) {
+      eventIterator.next();
+      itrPos++;
+    }
+
+    int eventCounter = 0;
     Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
     ReconOMMetadataManager metadataManager = getReconOMMetadataManager();
 
@@ -76,6 +89,7 @@ public boolean processWithLegacy(OMUpdateEventBatch events) {
       OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent =
           eventIterator.next();
       OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+      eventCounter++;
 
       // we only process updates on OM's KeyTable
       String table = omdbUpdateEvent.getTable();
@@ -114,20 +128,24 @@ public boolean processWithLegacy(OMUpdateEventBatch 
events) {
       } catch (IOException ioEx) {
         LOG.error("Unable to process Namespace Summary data in Recon DB. ",
             ioEx);
-        return false;
+        nsSummaryMap.clear();
+        return new ImmutablePair<>(seekPos, false);
       }
-      if (!checkAndCallFlushToDB(nsSummaryMap)) {
-        return false;
+      if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+        if (!flushAndCommitNSToDB(nsSummaryMap)) {
+          return new ImmutablePair<>(seekPos, false);
+        }
+        seekPos = eventCounter + 1;
       }
     }
 
     // flush and commit left out entries at end
     if (!flushAndCommitNSToDB(nsSummaryMap)) {
-      return false;
+      return new ImmutablePair<>(seekPos, false);
     }
 
     LOG.debug("Completed a process run of NSSummaryTaskWithLegacy");
-    return true;
+    return new ImmutablePair<>(seekPos, true);
   }
 
   private void processWithFileSystemLayout(OmKeyInfo updatedKeyInfo,
@@ -278,14 +296,17 @@ public boolean reprocessWithLegacy(OMMetadataManager 
omMetadataManager) {
             setParentBucketId(keyInfo);
             handlePutKeyEvent(keyInfo, nsSummaryMap);
           }
-          if (!checkAndCallFlushToDB(nsSummaryMap)) {
-            return false;
+          if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+            if (!flushAndCommitNSToDB(nsSummaryMap)) {
+              return false;
+            }
           }
         }
       }
     } catch (IOException ioEx) {
       LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
           ioEx);
+      nsSummaryMap.clear();
       return false;
     }
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
index 7364639d47..e15cc2836f 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
@@ -23,6 +23,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -47,13 +49,17 @@ public class NSSummaryTaskWithOBS extends 
NSSummaryTaskDbEventHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(NSSummaryTaskWithOBS.class);
 
+  private final long nsSummaryFlushToDBMaxThreshold;
+
 
   public NSSummaryTaskWithOBS(
       ReconNamespaceSummaryManager reconNamespaceSummaryManager,
       ReconOMMetadataManager reconOMMetadataManager,
-      OzoneConfiguration ozoneConfiguration) {
+      OzoneConfiguration ozoneConfiguration,
+      long nsSummaryFlushToDBMaxThreshold) {
     super(reconNamespaceSummaryManager,
         reconOMMetadataManager, ozoneConfiguration);
+    this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
   }
 
 
@@ -89,14 +95,17 @@ public boolean reprocessWithOBS(OMMetadataManager 
omMetadataManager) {
           setKeyParentID(keyInfo);
 
           handlePutKeyEvent(keyInfo, nsSummaryMap);
-          if (!checkAndCallFlushToDB(nsSummaryMap)) {
-            return false;
+          if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+            if (!flushAndCommitNSToDB(nsSummaryMap)) {
+              return false;
+            }
           }
         }
       }
     } catch (IOException ioEx) {
       LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
           ioEx);
+      nsSummaryMap.clear();
       return false;
     }
 
@@ -108,14 +117,23 @@ public boolean reprocessWithOBS(OMMetadataManager 
omMetadataManager) {
     return true;
   }
 
-  public boolean processWithOBS(OMUpdateEventBatch events) {
+  public Pair<Integer, Boolean> processWithOBS(OMUpdateEventBatch events,
+                                               int seekPos) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
     Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
 
+    int itrPos = 0;
+    while (eventIterator.hasNext() && itrPos < seekPos) {
+      eventIterator.next();
+      itrPos++;
+    }
+
+    int eventCounter = 0;
     while (eventIterator.hasNext()) {
       OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent =
           eventIterator.next();
       OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+      eventCounter++;
 
       // We only process updates on OM's KeyTable
       String table = omdbUpdateEvent.getTable();
@@ -181,27 +199,27 @@ public boolean processWithOBS(OMUpdateEventBatch events) {
         default:
           LOG.debug("Skipping DB update event: {}", action);
         }
-
-        if (!checkAndCallFlushToDB(nsSummaryMap)) {
-          return false;
+        if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+          if (!flushAndCommitNSToDB(nsSummaryMap)) {
+            return new ImmutablePair<>(seekPos, false);
+          }
+          seekPos = eventCounter + 1;
         }
       } catch (IOException ioEx) {
         LOG.error("Unable to process Namespace Summary data in Recon DB. ",
             ioEx);
-        return false;
-      }
-      if (!checkAndCallFlushToDB(nsSummaryMap)) {
-        return false;
+        nsSummaryMap.clear();
+        return new ImmutablePair<>(seekPos, false);
       }
     }
 
     // Flush and commit left-out entries at the end
     if (!flushAndCommitNSToDB(nsSummaryMap)) {
-      return false;
+      return new ImmutablePair<>(seekPos, false);
     }
 
     LOG.debug("Completed a process run of NSSummaryTaskWithOBS");
-    return true;
+    return new ImmutablePair<>(seekPos, true);
   }
 
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
index 72d5906b96..6019990158 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
@@ -36,8 +36,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -99,7 +97,7 @@ public void init() {
   /**
    * Iterates the rows of each table in the OM snapshot DB and calculates the
    * counts and sizes for table data.
-   *
+   * <p>
    * For tables that require data size calculation
    * (as returned by getTablesToCalculateSize), both the number of
    * records (count) and total data size of the records are calculated.
@@ -109,7 +107,7 @@ public void init() {
    * @return Pair
    */
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     init();
     for (String tableName : tables) {
       Table table = omMetadataManager.getTable(tableName);
@@ -131,7 +129,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
         }
       } catch (IOException ioEx) {
         LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
-        return new ImmutablePair<>(getTaskName(), false);
+        return buildTaskResult(false);
       }
     }
     // Write the data to the DB
@@ -146,7 +144,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager 
omMetadataManager) {
     }
 
     LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
-    return new ImmutablePair<>(getTaskName(), true);
+    return buildTaskResult(true);
   }
 
   @Override
@@ -162,11 +160,13 @@ public Collection<String> getTaskTables() {
    * Read the update events and update the count and sizes of respective object
    * (volume, bucket, key etc.) based on the action (put or delete).
    *
-   * @param events Update events - PUT, DELETE and UPDATE.
+   * @param events            Update events - PUT, DELETE and UPDATE.
+   * @param subTaskSeekPosMap
    * @return Pair
    */
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public TaskResult process(OMUpdateEventBatch events,
+                            Map<String, Integer> subTaskSeekPosMap) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
 
     String tableName;
@@ -201,7 +201,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch 
events) {
         LOG.error(
             "Unexpected exception while processing the table {}, Action: {}",
             tableName, omdbUpdateEvent.getAction(), e);
-        return new ImmutablePair<>(getTaskName(), false);
+        return buildTaskResult(false);
       }
     }
     // Write the updated count and size information to the database
@@ -216,7 +216,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch 
events) {
     }
     LOG.debug("{} successfully processed in {} milliseconds",
         getTaskName(), (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(getTaskName(), true);
+    return buildTaskResult(true);
   }
 
   private void handlePutEvent(OMDBUpdateEvent<String, Object> event,
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
index 01079d529b..395fdf6b1e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
@@ -17,7 +17,8 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 
 /**
@@ -38,16 +39,109 @@ default void init() { }
 
   /**
    * Process a set of OM events on tables that the task is listening on.
-   * @param events Set of events to be processed by the task.
-   * @return Pair of task name -&gt; task success.
+   *
+   * @param events            The batch of OM update events to be processed.
+   * @param subTaskSeekPosMap A map containing the seek positions for
+   *                          each sub-task, indicating where processing 
should start.
+   * @return A {@link TaskResult} containing:
+   *         - The task name.
+   *         - A map of sub-task names to their respective seek positions.
+   *         - A boolean indicating whether the task was successful.
    */
-  Pair<String, Boolean> process(OMUpdateEventBatch events);
+  TaskResult process(OMUpdateEventBatch events,
+                     Map<String, Integer> subTaskSeekPosMap);
 
   /**
-   * Process a  on tables that the task is listening on.
-   * @param omMetadataManager OM Metadata manager instance.
-   * @return Pair of task name -&gt; task success.
+   * Reprocesses full entries in Recon OM RocksDB tables that the task is 
listening to.
+   *
+   * @param omMetadataManager The OM Metadata Manager instance used for 
accessing metadata.
+   * @return A {@link TaskResult} containing:
+   *         - The task name.
+   *         - A map of sub-task names to their respective seek positions.
+   *         - A boolean indicating whether the task was successful.
    */
-  Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
+  TaskResult reprocess(OMMetadataManager omMetadataManager);
 
+  /**
+   * Represents the result of a task execution, including the task name,
+   * sub-task seek positions, and success status.
+   *
+   * <p>This class is immutable and uses the Builder pattern for object 
creation.</p>
+   */
+  class TaskResult {
+    private final String taskName;
+    private final Map<String, Integer> subTaskSeekPositions;
+    private final boolean taskSuccess;
+
+    /**
+     * Private constructor to enforce the use of the {@link Builder}.
+     *
+     * @param builder The builder instance containing values for 
initialization.
+     */
+    private TaskResult(Builder builder) {
+      this.taskName = builder.taskName;
+      this.subTaskSeekPositions = builder.subTaskSeekPositions != null
+          ? builder.subTaskSeekPositions
+          : Collections.emptyMap(); // Default value
+      this.taskSuccess = builder.taskSuccess;
+    }
+
+    // Getters
+    public String getTaskName() {
+      return taskName;
+    }
+
+    public Map<String, Integer> getSubTaskSeekPositions() {
+      return subTaskSeekPositions;
+    }
+
+    public boolean isTaskSuccess() {
+      return taskSuccess;
+    }
+
+    /**
+     * Builder class for creating instances of {@link TaskResult}.
+     */
+    public static class Builder {
+      private String taskName;
+      private Map<String, Integer> subTaskSeekPositions = 
Collections.emptyMap(); // Default value
+      private boolean taskSuccess;
+
+      public Builder setTaskName(String taskName) {
+        this.taskName = taskName;
+        return this;
+      }
+
+      public Builder setSubTaskSeekPositions(Map<String, Integer> 
subTaskSeekPositions) {
+        this.subTaskSeekPositions = subTaskSeekPositions;
+        return this;
+      }
+
+      public Builder setTaskSuccess(boolean taskSuccess) {
+        this.taskSuccess = taskSuccess;
+        return this;
+      }
+
+      public TaskResult build() {
+        return new TaskResult(this);
+      }
+    }
+
+    // toString Method for debugging
+    @Override
+    public String toString() {
+      return "TaskResult{" +
+          "taskName='" + taskName + '\'' +
+          ", subTaskSeekPositions=" + subTaskSeekPositions +
+          ", taskSuccess=" + taskSuccess +
+          '}';
+    }
+  }
+
+  default TaskResult buildTaskResult(boolean success) {
+    return new TaskResult.Builder()
+        .setTaskName(getTaskName())
+        .setTaskSuccess(success)
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index c1d786db1a..e289b6ae15 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -24,6 +24,7 @@
 import com.google.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +37,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.recon.ReconConstants;
@@ -97,27 +97,27 @@ public void registerTask(ReconOmTask task) {
   @Override
   public synchronized void consumeOMEvents(OMUpdateEventBatch events, 
OMMetadataManager omMetadataManager) {
     if (!events.isEmpty()) {
-      Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new 
ArrayList<>();
-      List<String> failedTasks = new ArrayList<>();
+      Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new 
ArrayList<>();
+      List<ReconOmTask.TaskResult> failedTasks = new ArrayList<>();
       for (Map.Entry<String, ReconOmTask> taskEntry :
           reconOmTasks.entrySet()) {
         ReconOmTask task = taskEntry.getValue();
         ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
         taskStatusUpdater.recordRunStart();
         // events passed to process method is no longer filtered
-        tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.process(events)));
+        tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.process(events, Collections.emptyMap())));
       }
       processTasks(tasks, events, failedTasks);
 
       // Retry processing failed tasks
-      List<String> retryFailedTasks = new ArrayList<>();
+      List<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<>();
       if (!failedTasks.isEmpty()) {
         tasks.clear();
-        for (String taskName : failedTasks) {
-          ReconOmTask task = reconOmTasks.get(taskName);
+        for (ReconOmTask.TaskResult taskResult : failedTasks) {
+          ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
           // events passed to process method is no longer filtered
           tasks.add(new NamedCallableTask<>(task.getTaskName(),
-              () -> task.process(events)));
+              () -> task.process(events, 
taskResult.getSubTaskSeekPositions())));
         }
         processTasks(tasks, events, retryFailedTasks);
       }
@@ -126,12 +126,15 @@ public synchronized void 
consumeOMEvents(OMUpdateEventBatch events, OMMetadataMa
       ReconConstants.resetTableTruncatedFlags();
       if (!retryFailedTasks.isEmpty()) {
         tasks.clear();
-        for (String taskName : failedTasks) {
-          ReconOmTask task = reconOmTasks.get(taskName);
+        for (ReconOmTask.TaskResult taskResult : failedTasks) {
+          ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
           tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.reprocess(omMetadataManager)));
         }
-        List<String> reprocessFailedTasks = new ArrayList<>();
+        List<ReconOmTask.TaskResult> reprocessFailedTasks = new ArrayList<>();
         processTasks(tasks, events, reprocessFailedTasks);
+        // Here the assumption is that even if full re-process of task also 
fails,
+        // then there is something wrong in recon rocks DB got from OM and 
needs to be
+        // investigated.
         ignoreFailedTasks(reprocessFailedTasks);
       }
     }
@@ -141,8 +144,9 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch 
events, OMMetadataMa
    * Ignore tasks that failed reprocess step more than threshold times.
    * @param failedTasks list of failed tasks.
    */
-  private void ignoreFailedTasks(List<String> failedTasks) {
-    for (String taskName : failedTasks) {
+  private void ignoreFailedTasks(List<ReconOmTask.TaskResult> failedTasks) {
+    for (ReconOmTask.TaskResult taskResult : failedTasks) {
+      String taskName = taskResult.getTaskName();
       LOG.info("Reprocess step failed for task {}.", taskName);
       if (taskFailureCounter.get(taskName).incrementAndGet() >
           TASK_FAILURE_THRESHOLD) {
@@ -155,14 +159,16 @@ private void ignoreFailedTasks(List<String> failedTasks) {
 
   @Override
   public synchronized void reInitializeTasks(ReconOMMetadataManager 
omMetadataManager) {
-    Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new 
ArrayList<>();
+    Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new 
ArrayList<>();
     ReconConstants.resetTableTruncatedFlags();
     for (Map.Entry<String, ReconOmTask> taskEntry :
         reconOmTasks.entrySet()) {
       ReconOmTask task = taskEntry.getValue();
-      ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
+      ReconTaskStatusUpdater taskStatusUpdater =
+          taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
       taskStatusUpdater.recordRunStart();
-      tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.reprocess(omMetadataManager)));
+      tasks.add(new NamedCallableTask<>(task.getTaskName(),
+          () -> task.reprocess(omMetadataManager)));
     }
 
     try {
@@ -178,14 +184,16 @@ public synchronized void 
reInitializeTasks(ReconOMMetadataManager omMetadataMana
               throw new TaskExecutionException(task.getTaskName(), e);
             }
           }, executorService).thenAccept(result -> {
-            String taskName = result.getLeft();
-            ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-            if (!result.getRight()) {
+            String taskName = result.getTaskName();
+            ReconTaskStatusUpdater taskStatusUpdater =
+                taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+            if (!result.isTaskSuccess()) {
               LOG.error("Init failed for task {}.", taskName);
               taskStatusUpdater.setLastTaskRunStatus(-1);
             } else {
               taskStatusUpdater.setLastTaskRunStatus(0);
-              
taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
+              taskStatusUpdater.setLastUpdatedSeqNumber(
+                  omMetadataManager.getLastSequenceNumberFromDB());
             }
             taskStatusUpdater.recordRunCompletion();
           }).exceptionally(ex -> {
@@ -237,8 +245,9 @@ public synchronized void stop() {
    * @param events      A batch of {@link OMUpdateEventBatch} events to fetch 
sequence number of last event in batch.
    * @param failedTasks Reference of the list to which we want to add the 
failed tasks for retry/reprocessing
    */
-  private void processTasks(Collection<NamedCallableTask<Pair<String, 
Boolean>>> tasks,
-                            OMUpdateEventBatch events, List<String> 
failedTasks) {
+  private void processTasks(
+      Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks,
+      OMUpdateEventBatch events, List<ReconOmTask.TaskResult> failedTasks) {
     List<CompletableFuture<Void>> futures = tasks.stream()
         .map(task -> CompletableFuture.supplyAsync(() -> {
           try {
@@ -251,11 +260,15 @@ private void 
processTasks(Collection<NamedCallableTask<Pair<String, Boolean>>> t
             throw new TaskExecutionException(task.getTaskName(), e);
           }
         }, executorService).thenAccept(result -> {
-          String taskName = result.getLeft();
-          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-          if (!result.getRight()) {
+          String taskName = result.getTaskName();
+          ReconTaskStatusUpdater taskStatusUpdater =
+              taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+          if (!result.isTaskSuccess()) {
             LOG.error("Task {} failed", taskName);
-            failedTasks.add(result.getLeft());
+            failedTasks.add(new ReconOmTask.TaskResult.Builder()
+                .setTaskName(taskName)
+                .setSubTaskSeekPositions(result.getSubTaskSeekPositions())
+                .build());
             taskStatusUpdater.setLastTaskRunStatus(-1);
           } else {
             taskFailureCounter.get(taskName).set(0);
@@ -270,7 +283,8 @@ private void 
processTasks(Collection<NamedCallableTask<Pair<String, Boolean>>> t
             String taskName = taskEx.getTaskName();
             LOG.error("The above error occurred while trying to execute task: 
{}", taskName);
 
-            ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+            ReconTaskStatusUpdater taskStatusUpdater =
+                taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
             taskStatusUpdater.setLastTaskRunStatus(-1);
             taskStatusUpdater.recordRunCompletion();
           }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
index 9efd3c9e99..a81c5d1833 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
@@ -477,7 +477,7 @@ public void testGetKeysForContainer() throws IOException {
     setUpFSOData();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
         new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
-            reconOMMetadataManager, new OzoneConfiguration());
+            reconOMMetadataManager, new OzoneConfiguration(), 10);
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
     // Reprocess the container key mapper to ensure the latest mapping is used
     reprocessContainerKeyMapper();
@@ -565,7 +565,7 @@ public void testGetKeysForContainerWithPrevKey() throws 
IOException {
     reprocessContainerKeyMapper();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
         new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
-            reconOMMetadataManager, new OzoneConfiguration());
+            reconOMMetadataManager, new OzoneConfiguration(), 10);
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
     response = containerEndpoint.getKeysForContainer(20L, -1, "/0/1/2/file7");
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 52ac1d64f4..9b16643d0f 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -66,7 +66,6 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -135,6 +134,7 @@
 import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
 import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
 import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
+import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
@@ -789,9 +789,9 @@ public void testGetClusterState() throws Exception {
     });
     omTableInsightTask.init();
     // check volume, bucket and key count after running table count task
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(reconOMMetadataManager);
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     response = clusterStateEndpoint.getClusterState();
     clusterStateResponse = (ClusterStateResponse) response.getEntity();
     assertEquals(2, clusterStateResponse.getVolumes());
@@ -869,10 +869,10 @@ public void testGetFileCounts() throws Exception {
         .thenReturn(omKeyInfo3);
 
     // Call reprocess on both endpoints.
-    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
-    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
-    assertTrue(resultOBS.getRight());
-    assertTrue(resultFSO.getRight());
+    ReconOmTask.TaskResult resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    ReconOmTask.TaskResult resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+    assertTrue(resultOBS.isTaskSuccess());
+    assertTrue(resultFSO.isTaskSuccess());
 
     // The two tasks should result in 3 rows.
     assertEquals(3, fileCountBySizeDao.count());
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
index 810c9096c9..84d0807a34 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
@@ -106,7 +106,7 @@ public void setUp() throws Exception {
     populateOMDB();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
         new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
-            reconOMMetadataManager, ozoneConfiguration);
+            reconOMMetadataManager, ozoneConfiguration, 10);
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
   }
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
index edb495421f..77d5bb69a5 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
@@ -389,7 +389,7 @@ public void setUp() throws Exception {
     populateOMDB();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
         new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
-            reconOMMetadataManager, ozoneConfiguration);
+            reconOMMetadataManager, ozoneConfiguration, 10);
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
     commonUtils = new CommonUtils();
   }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
index 8dca6ed566..1a01e12543 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
@@ -386,7 +386,7 @@ public void setUp() throws Exception {
     populateOMDB();
     NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy = 
         new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, 
-                                    reconOMMetadataManager, conf);
+                                    reconOMMetadataManager, conf, 10);
     nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
     commonUtils = new CommonUtils();
   }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
index 7743b5aa39..a162f48d82 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
@@ -382,11 +382,11 @@ public void setUp() throws Exception {
     populateOMDB();
     NSSummaryTaskWithOBS nsSummaryTaskWithOBS =
         new NSSummaryTaskWithOBS(reconNamespaceSummaryManager,
-            reconOMMetadataManager, conf);
+            reconOMMetadataManager, conf, 10);
     nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
     NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy =
         new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager,
-            reconOMMetadataManager, conf);
+            reconOMMetadataManager, conf, 10);
     nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
     commonUtils = new CommonUtils();
   }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 065c5ac2ca..26fb6fe21b 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -321,13 +321,13 @@ public void setUp() throws Exception {
     setUpOmData();
     nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconOMMetadataManager, ozoneConfiguration, 10);
     nsSummaryTaskWithOBS  = new NSSummaryTaskWithOBS(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconOMMetadataManager, ozoneConfiguration, 10);
     nsSummaryTaskWithFSO  = new NSSummaryTaskWithFSO(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconOMMetadataManager, ozoneConfiguration, 10);
     reconNamespaceSummaryManager.clearNSSummaryTable();
     nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
     nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
index ce84eec418..5123619416 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
@@ -116,7 +116,7 @@ public void setUp() throws Exception {
     populateOMDB();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
         new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
-            reconOMMetadataManager, ozoneConfiguration);
+            reconOMMetadataManager, ozoneConfiguration, 10);
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
   }
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
index aec2b7d6e2..2fc1c320d1 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
@@ -19,8 +19,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 
 /**
@@ -52,20 +51,21 @@ public Collection<String> getTaskTables() {
   }
 
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public TaskResult process(
+      OMUpdateEventBatch events, Map<String, Integer> seekPos) {
     if (++callCtr <= numFailuresAllowed) {
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     } else {
-      return new ImmutablePair<>(getTaskName(), true);
+      return buildTaskResult(true);
     }
   }
 
   @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public TaskResult reprocess(OMMetadataManager omMetadataManager) {
     if (++callCtr <= numFailuresAllowed) {
-      return new ImmutablePair<>(getTaskName(), false);
+      return buildTaskResult(false);
     } else {
-      return new ImmutablePair<>(getTaskName(), true);
+      return buildTaskResult(true);
     }
   }
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index b78d970bac..36b335c1b4 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -336,7 +336,7 @@ public void testKeyTableProcess() throws IOException {
     assertEquals(1, reconContainerMetadataManager.getKeyCountForContainer(3L));
 
     // Process PUT & DELETE event.
-    containerKeyMapperTask.process(omUpdateEventBatch);
+    containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap());
 
     keyPrefixesForContainer = reconContainerMetadataManager
         .getKeyPrefixesForContainer(1);
@@ -427,7 +427,7 @@ public void testFileTableProcess() throws Exception {
         }, 0L);
 
     // Process PUT event for both the keys
-    containerKeyMapperTask.process(omUpdateEventBatch);
+    containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap());
 
     keyPrefixesForContainer = reconContainerMetadataManager
         .getKeyPrefixesForContainer(1);
@@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception {
         }, 0L);
 
     // Process DELETE event for key2
-    containerKeyMapperTask.process(omUpdateEventBatch2);
+    containerKeyMapperTask.process(omUpdateEventBatch2, 
Collections.emptyMap());
 
     keyPrefixesForContainer = reconContainerMetadataManager
         .getKeyPrefixesForContainer(1);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 30ca22154b..fc2f152ac0 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -33,8 +33,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.TypedTable;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
@@ -130,12 +130,12 @@ public void testReprocess() throws IOException {
     fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L, 
10L));
 
     // Call reprocess on both tasks.
-    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
-    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+    ReconOmTask.TaskResult resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    ReconOmTask.TaskResult resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
 
     // Verify that both tasks reported success.
-    assertTrue(resultOBS.getRight(), "OBS reprocess should return true");
-    assertTrue(resultFSO.getRight(), "FSO reprocess should return true");
+    assertTrue(resultOBS.isTaskSuccess(), "OBS reprocess should return true");
+    assertTrue(resultFSO.isTaskSuccess(), "FSO reprocess should return true");
 
     // After processing, there should be 3 rows (one per bin).
     assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB");
@@ -197,8 +197,8 @@ public void testProcess() {
         new OMUpdateEventBatch(Arrays.asList(event, event2), 0L);
 
     // Process the same batch on both endpoints.
-    fileSizeCountTaskOBS.process(omUpdateEventBatch);
-    fileSizeCountTaskFSO.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+    fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
 
     // After processing the first batch:
     // Since each endpoint processes the same events, the counts are doubled.
@@ -256,8 +256,8 @@ public void testProcess() {
 
     omUpdateEventBatch = new OMUpdateEventBatch(
         Arrays.asList(updateEvent, putEvent, deleteEvent), 0L);
-    fileSizeCountTaskOBS.process(omUpdateEventBatch);
-    fileSizeCountTaskFSO.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+    fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
 
     assertEquals(4, fileCountBySizeDao.count());
     recordToFind.value3(1024L);
@@ -322,10 +322,10 @@ public void testReprocessAtScale() throws IOException {
     
when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
 
     // Call reprocess on both endpoints.
-    Pair<String, Boolean> resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
-    Pair<String, Boolean> resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
-    assertTrue(resultOBS.getRight());
-    assertTrue(resultFSO.getRight());
+    ReconOmTask.TaskResult resultOBS = 
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+    ReconOmTask.TaskResult resultFSO = 
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+    assertTrue(resultOBS.isTaskSuccess());
+    assertTrue(resultFSO.isTaskSuccess());
 
     // 2 volumes * 500 buckets * 42 bins = 42000 rows
     assertEquals(42000, fileCountBySizeDao.count());
@@ -393,8 +393,8 @@ public void testProcessAtScale() {
 
     OMUpdateEventBatch omUpdateEventBatch = new 
OMUpdateEventBatch(omDbEventList, 0L);
     // Process the same batch on both endpoints.
-    fileSizeCountTaskOBS.process(omUpdateEventBatch);
-    fileSizeCountTaskFSO.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+    fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
 
     // Verify 2 keys are in correct bins.
     assertEquals(10000, fileCountBySizeDao.count());
@@ -470,8 +470,8 @@ public void testProcessAtScale() {
     }
 
     omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L);
-    fileSizeCountTaskOBS.process(omUpdateEventBatch);
-    fileSizeCountTaskFSO.process(omUpdateEventBatch);
+    fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+    fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
 
     assertEquals(10000, fileCountBySizeDao.count());
     recordToFind = dslContext
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
index e3241287ea..ea7efaddfd 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
@@ -29,6 +29,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Set;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -220,7 +221,7 @@ public class TestProcess {
     @BeforeEach
     public void setUp() throws IOException {
       nSSummaryTask.reprocess(reconOMMetadataManager);
-      nSSummaryTask.process(processEventBatch());
+      nSSummaryTask.process(processEventBatch(), Collections.emptyMap());
 
       nsSummaryForBucket1 =
           reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
index eb38084b4d..83b7222405 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.recon.tasks;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
@@ -27,12 +28,16 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -47,12 +52,14 @@
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 /**
  * Test for NSSummaryTaskWithFSO.
@@ -118,7 +125,7 @@ public class TestNSSummaryTaskWithFSO {
   void setUp(@TempDir File tmpDir) throws Exception {
     ozoneConfiguration = new OzoneConfiguration();
     ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
-        10);
+        3);
     omMetadataManager = initializeNewOmMetadataManager(new File(tmpDir, "om"));
     OzoneManagerServiceProvider ozoneManagerServiceProvider =
         getMockOzoneManagerServiceProviderWithFSO();
@@ -141,9 +148,11 @@ void setUp(@TempDir File tmpDir) throws Exception {
 
     populateOMDB();
 
+    long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 3);
     nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
         reconNamespaceSummaryManager, reconOMMetadataManager,
-        ozoneConfiguration);
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
   }
 
   /**
@@ -313,10 +322,12 @@ public class TestProcess {
     private OMDBUpdateEvent keyEvent6;
     private OMDBUpdateEvent keyEvent7;
 
+    private Pair<Integer, Boolean> result;
+
     @BeforeEach
     public void setUp() throws IOException {
       nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
-      nSSummaryTaskWithFso.processWithFSO(processEventBatch());
+      result = nSSummaryTaskWithFso.processWithFSO(processEventBatch(), 0);
     }
 
     private OMUpdateEventBatch processEventBatch() throws IOException {
@@ -511,6 +522,97 @@ public void testParentIdAfterProcessEventBatch() throws 
IOException {
           "DIR_FIVE's parent ID should match BUCKET_TWO_OBJECT_ID.");
     }
 
+    @Test
+    void testProcessWithFSOFlushAfterThresholdAndSuccess() throws IOException {
+      // Call the method under test
+
+      // Assertions
+      Assertions.assertNotNull(result, "Result should not be null");
+      // Why seekPos should be 7 ? because we have threshold value for flush 
is set as 3,
+      // and we have total 7 events, so nsSummaryMap will be flushed in 2 
batches and
+      // during second batch flush, eventCounter will be 6, then last event7 
alone will
+      // be flushed out of loop as remaining event. At every batch flush based 
on threshold,
+      // seekPos is set as equal to eventCounter + 1, so  seekPos will be 7.
+      Assertions.assertEquals(7, result.getLeft(), "seekPos should be 7");
+      Assertions.assertTrue(result.getRight(), "The processing should fail due 
to flush failure");
+    }
+
+    @Test
+    void testProcessWithFSOFlushAfterThresholdAndFailureOfLastElement()
+        throws NoSuchFieldException, IllegalAccessException {
+      // Assume the NamespaceSummaryTaskWithFSO object is already created
+      NSSummaryTaskWithFSO task = mock(NSSummaryTaskWithFSO.class);
+
+      // Set the value of nsSummaryFlushToDBMaxThreshold to 3 using reflection
+      Field thresholdField = 
NSSummaryTaskWithFSO.class.getDeclaredField("nsSummaryFlushToDBMaxThreshold");
+      thresholdField.setAccessible(true);
+      thresholdField.set(task, 3);
+
+      ReconNamespaceSummaryManager mockReconNamespaceSummaryManager = 
mock(ReconNamespaceSummaryManager.class);
+      Field managerField = 
NSSummaryTaskDbEventHandler.class.getDeclaredField("reconNamespaceSummaryManager");
+      managerField.setAccessible(true);
+      managerField.set(task, mockReconNamespaceSummaryManager);
+
+      // Mock the OMUpdateEventBatch and its iterator
+      OMUpdateEventBatch events = mock(OMUpdateEventBatch.class);
+      Iterator<OMDBUpdateEvent> mockIterator = mock(Iterator.class);
+
+      Mockito.when(events.getIterator()).thenReturn(mockIterator);
+
+      // Mock OMDBUpdateEvent objects and their behavior
+      OMDBUpdateEvent<String, OmKeyInfo> event1 = mock(OMDBUpdateEvent.class);
+      OMDBUpdateEvent<String, OmKeyInfo> event2 = mock(OMDBUpdateEvent.class);
+      OMDBUpdateEvent<String, OmKeyInfo> event3 = mock(OMDBUpdateEvent.class);
+      OMDBUpdateEvent<String, OmKeyInfo> event4 = mock(OMDBUpdateEvent.class);
+
+      // Mock getAction() for each event
+      
Mockito.when(event1.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+      
Mockito.when(event2.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+      
Mockito.when(event3.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+      
Mockito.when(event4.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+
+      OmKeyInfo keyInfo1 = new 
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(2).setKeyName("key1")
+          .setBucketName("bucket1")
+          .setDataSize(1024).setVolumeName("volume1").build();
+      OmKeyInfo keyInfo2 = new 
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+          .setBucketName("bucket1")
+          .setDataSize(1024).setVolumeName("volume1").build();
+      OmKeyInfo keyInfo3 = new 
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+          .setBucketName("bucket1")
+          .setDataSize(1024).setVolumeName("volume1").build();
+      OmKeyInfo keyInfo4 = new 
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+          .setBucketName("bucket1")
+          .setDataSize(1024).setVolumeName("volume1").build();
+      Mockito.when(event1.getValue()).thenReturn(keyInfo1);
+      Mockito.when(event2.getValue()).thenReturn(keyInfo2);
+      Mockito.when(event3.getValue()).thenReturn(keyInfo3);
+      Mockito.when(event4.getValue()).thenReturn(keyInfo4);
+
+      // Mock getTable() to return valid table name
+      Mockito.when(event1.getTable()).thenReturn(FILE_TABLE);
+      Mockito.when(event2.getTable()).thenReturn(FILE_TABLE);
+      Mockito.when(event3.getTable()).thenReturn(FILE_TABLE);
+      Mockito.when(event4.getTable()).thenReturn(FILE_TABLE);
+
+      // Mock iterator to return the events
+      Mockito.when(mockIterator.hasNext()).thenReturn(true, true, true, true, 
false);
+      Mockito.when(mockIterator.next()).thenReturn(event1, event2, event3, 
event4);
+
+      // Mock the flushAndCommitNSToDB method to fail on the last flush
+      NSSummaryTaskWithFSO taskSpy = Mockito.spy(task);
+      
Mockito.doReturn(true).doReturn(true).doReturn(false).when(taskSpy).flushAndCommitNSToDB(Mockito.anyMap());
+
+      // Call the method under test
+      Pair<Integer, Boolean> result1 = taskSpy.processWithFSO(events, 0);
+
+      // Assertions
+      Assertions.assertNotNull(result1, "Result should not be null");
+      Assertions.assertEquals(0, result1.getLeft(), "seekPos should be 4");
+
+      // Verify interactions
+      Mockito.verify(mockIterator, Mockito.times(3)).next();
+      Mockito.verify(taskSpy, 
Mockito.times(1)).flushAndCommitNSToDB(Mockito.anyMap());
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
index ceb88a3656..4d1f58e671 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
@@ -23,6 +23,7 @@
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -36,8 +37,8 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmConfig;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -142,9 +143,11 @@ void setUp(@TempDir File tmpDir) throws Exception {
 
     populateOMDB();
 
+    long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10);
     nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, omConfiguration);
+        reconOMMetadataManager, omConfiguration, 
nsSummaryFlushToDBMaxThreshold);
   }
 
   /**
@@ -292,7 +295,7 @@ public class TestProcess {
     @BeforeEach
     public void setUp() throws IOException {
       nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
-      nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch());
+      nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0);
 
       nsSummaryForBucket1 =
           reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
@@ -689,8 +692,8 @@ private void initializeNewOmMetadataManager(
     omConfiguration = new OzoneConfiguration();
     omConfiguration.set(OZONE_OM_DB_DIRS,
         omDbDir.getAbsolutePath());
-    omConfiguration.set(OMConfigKeys
-        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+    omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true");
+    omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10");
     omMetadataManager = new OmMetadataManagerImpl(
         omConfiguration, null);
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
index 766478b871..48054d1eed 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
@@ -22,6 +22,7 @@
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -115,6 +116,8 @@ void setUp(@TempDir File tmpDir) throws Exception {
     ozoneConfiguration = new OzoneConfiguration();
     
ozoneConfiguration.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
         false);
+    ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+        10);
 
     ReconTestInjector reconTestInjector =
         new ReconTestInjector.Builder(tmpDir)
@@ -132,9 +135,12 @@ void setUp(@TempDir File tmpDir) throws Exception {
 
     populateOMDB();
 
+    long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10);
     nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconOMMetadataManager, ozoneConfiguration,
+        nsSummaryFlushToDBMaxThreshold);
   }
 
   /**
@@ -240,7 +246,7 @@ public void setUp() throws IOException {
       // reinit Recon RocksDB's namespace CF.
       reconNamespaceSummaryManager.clearNSSummaryTable();
       nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
-      nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch());
+      nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0);
 
       nsSummaryForBucket1 =
           reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
index 0db53aee12..386a5539f1 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
@@ -22,6 +22,8 @@
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -34,8 +36,8 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmConfig;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -126,9 +128,13 @@ void setUp(@TempDir File tmpDir) throws Exception {
 
     populateOMDB();
 
+    long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
     nSSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
         reconNamespaceSummaryManager,
-        reconOMMetadataManager, omConfiguration);
+        reconOMMetadataManager, omConfiguration,
+        nsSummaryFlushToDBMaxThreshold);
   }
 
   /**
@@ -234,7 +240,7 @@ public void setUp() throws IOException {
       // reinit Recon RocksDB's namespace CF.
       reconNamespaceSummaryManager.clearNSSummaryTable();
       nSSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
-      nSSummaryTaskWithOBS.processWithOBS(processEventBatch());
+      nSSummaryTaskWithOBS.processWithOBS(processEventBatch(), 0);
 
       nsSummaryForBucket1 =
           reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
@@ -458,8 +464,8 @@ private void initializeNewOmMetadataManager(
     omConfiguration = new OzoneConfiguration();
     omConfiguration.set(OZONE_OM_DB_DIRS,
         omDbDir.getAbsolutePath());
-    omConfiguration.set(OMConfigKeys
-        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+    omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true");
+    omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10");
     omMetadataManager = new OmMetadataManagerImpl(
         omConfiguration, null);
 
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
index 040975a5ef..dc8d34f335 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
@@ -45,9 +45,9 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -155,7 +155,7 @@ private void initializeInjector() throws IOException {
         globalStatsDao, getConfiguration(), reconOMMetadataManager);
     nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
         reconNamespaceSummaryManager, reconOMMetadataManager,
-        ozoneConfiguration);
+        ozoneConfiguration, 10);
     dslContext = getDslContext();
 
     omTableInsightTask.setTables(omTableInsightTask.getTaskTables());
@@ -289,9 +289,9 @@ public void testReprocessForDeletedDirectory() throws 
Exception {
     // Generate NamespaceSummary for the OM DB
     nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
 
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(reconOMMetadataManager);
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     assertEquals(3, getCountForTable(DELETED_DIR_TABLE));
   }
 
@@ -329,7 +329,7 @@ public void testProcessForDeletedDirectoryTable() throws 
IOException {
           DELETED_DIR_TABLE, PUT, null));
     }
     OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
-    omTableInsightTask.process(putEventBatch);
+    omTableInsightTask.process(putEventBatch, Collections.emptyMap());
     assertEquals(5, getCountForTable(DELETED_DIR_TABLE));
 
 
@@ -343,7 +343,7 @@ public void testProcessForDeletedDirectoryTable() throws 
IOException {
         getOmKeyInfo("vol1", "bucket1", DIR_ONE, 3L, false), DELETED_DIR_TABLE,
         DELETE, null));
     OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 
0L);
-    omTableInsightTask.process(deleteEventBatch);
+    omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
     assertEquals(3, getCountForTable(DELETED_DIR_TABLE));
   }
 
@@ -376,10 +376,10 @@ public void testReprocessForCount() throws Exception {
       when(mockIter.next()).thenReturn(mockKeyValue);
     }
 
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(omMetadataManager);
 
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     assertEquals(5L, getCountForTable(KEY_TABLE));
     assertEquals(5L, getCountForTable(VOLUME_TABLE));
     assertEquals(5L, getCountForTable(BUCKET_TABLE));
@@ -397,9 +397,9 @@ public void testReprocessForOpenKeyTable() throws Exception 
{
     writeOpenKeyToOm(reconOMMetadataManager,
         "key1", "Bucket3", "Volume3", null, 3L);
 
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(reconOMMetadataManager);
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     assertEquals(3L, getCountForTable(OPEN_KEY_TABLE));
     // Test for both replicated and unreplicated size for OPEN_KEY_TABLE
     assertEquals(6L, getUnReplicatedSizeForTable(OPEN_KEY_TABLE));
@@ -416,9 +416,9 @@ public void testReprocessForOpenFileTable() throws 
Exception {
     writeOpenFileToOm(reconOMMetadataManager,
         "file3", "Bucket3", "Volume3", "file3", 3, 0, 3, 3, null, 3L);
 
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(reconOMMetadataManager);
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     assertEquals(3L, getCountForTable(OPEN_FILE_TABLE));
     // Test for both replicated and unreplicated size for OPEN_FILE_TABLE
     assertEquals(6L, getUnReplicatedSizeForTable(OPEN_FILE_TABLE));
@@ -440,9 +440,9 @@ public void testReprocessForDeletedTable() throws Exception 
{
         deletedKeysList3, "Bucket3", "Volume3");
 
 
-    Pair<String, Boolean> result =
+    ReconOmTask.TaskResult result =
         omTableInsightTask.reprocess(reconOMMetadataManager);
-    assertTrue(result.getRight());
+    assertTrue(result.isTaskSuccess());
     assertEquals(6L, getCountForTable(DELETED_TABLE));
     // Test for both replicated and unreplicated size for DELETED_TABLE
     assertEquals(600L, getUnReplicatedSizeForTable(DELETED_TABLE));
@@ -479,7 +479,7 @@ public void testProcessForCount() {
 
     // Processing the initial batch of events
     OMUpdateEventBatch initialBatch = new OMUpdateEventBatch(initialEvents, 
0L);
-    omTableInsightTask.process(initialBatch);
+    omTableInsightTask.process(initialBatch, Collections.emptyMap());
 
     // Verifying the count in each table
     for (String tableName : omTableInsightTask.getTaskTables()) {
@@ -508,7 +508,7 @@ public void testProcessForCount() {
     // Processing the additional events
     OMUpdateEventBatch additionalBatch =
         new OMUpdateEventBatch(additionalEvents, 0L);
-    omTableInsightTask.process(additionalBatch);
+    omTableInsightTask.process(additionalBatch, Collections.emptyMap());
     // Verifying the final count in each table
     for (String tableName : omTableInsightTask.getTaskTables()) {
       if (tableName.equals(DELETED_TABLE)) {
@@ -537,7 +537,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
     }
 
     OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
-    omTableInsightTask.process(putEventBatch);
+    omTableInsightTask.process(putEventBatch, Collections.emptyMap());
 
     // After 5 PUTs, size should be 5 * 1000 = 5000
     for (String tableName : new ArrayList<>(
@@ -555,7 +555,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
         getOMUpdateEvent("item0", omKeyInfo, OPEN_FILE_TABLE, DELETE, null));
 
     OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 
0L);
-    omTableInsightTask.process(deleteEventBatch);
+    omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
 
     // After deleting "item0", size should be 4 * 1000 = 4000
     for (String tableName : new ArrayList<>(
@@ -578,7 +578,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
     }
 
     OMUpdateEventBatch updateEventBatch = new OMUpdateEventBatch(updateEvents, 
0L);
-    omTableInsightTask.process(updateEventBatch);
+    omTableInsightTask.process(updateEventBatch, Collections.emptyMap());
 
     // After updating "item1", size should be 4000 - 1000 + 2000 = 5000
     //  presentValue - oldValue + newValue = updatedValue
@@ -615,7 +615,7 @@ public void testProcessForDeletedTable() {
               null));
     }
     OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
-    omTableInsightTask.process(putEventBatch);
+    omTableInsightTask.process(putEventBatch, Collections.emptyMap());
     // Each of the 5 RepeatedOmKeyInfo object has 5 OmKeyInfo obj,
     // so total deleted keys should be 5 * 5 = 25
     assertEquals(25L, getCountForTable(DELETED_TABLE));
@@ -631,7 +631,7 @@ public void testProcessForDeletedTable() {
         getOMUpdateEvent("item0", repeatedOmKeyInfo, DELETED_TABLE, DELETE,
             null));
     OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 
0L);
-    omTableInsightTask.process(deleteEventBatch);
+    omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
     // After deleting "item0" total deleted keys should be 20
     assertEquals(20L, getCountForTable(DELETED_TABLE));
     // After deleting "item0", size should be 4 * 1000 = 4000
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index 384da46aa1..ad7eafd160 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -21,6 +21,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -28,7 +29,6 @@
 import static org.mockito.Mockito.when;
 
 import java.util.HashSet;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
@@ -81,8 +81,8 @@ public void testRegisterTask() {
   @Test
   public void testConsumeOMEvents() throws Exception {
     ReconOmTask reconOmTaskMock = getMockTask("MockTask");
-    when(reconOmTaskMock.process(any(OMUpdateEventBatch.class)))
-        .thenReturn(new ImmutablePair<>("MockTask", true));
+    when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap()))
+        .thenReturn(new 
ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build());
     reconTaskController.registerTask(reconOmTaskMock);
     OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
@@ -94,7 +94,7 @@ public void testConsumeOMEvents() throws Exception {
         mock(OMMetadataManager.class));
 
     verify(reconOmTaskMock, times(1))
-        .process(any());
+        .process(any(), anyMap());
     long endTime = System.currentTimeMillis();
 
     ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask");
@@ -111,7 +111,7 @@ public void testTaskRecordsFailureOnException() throws 
Exception {
     OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
 
     // Throw exception when trying to run task
-    when(reconOmTaskMock.process(any(OMUpdateEventBatch.class)))
+    when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap()))
         .thenThrow(new RuntimeException("Mock Failure"));
     reconTaskController.registerTask(reconOmTaskMock);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
@@ -123,7 +123,7 @@ public void testTaskRecordsFailureOnException() throws 
Exception {
         mock(OMMetadataManager.class));
 
     verify(reconOmTaskMock, times(1))
-        .process(any());
+        .process(any(), anyMap());
     long endTime = System.currentTimeMillis();
 
     ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask");
@@ -209,7 +209,7 @@ public void testReInitializeTasks() throws Exception {
     ReconOmTask reconOmTaskMock =
         getMockTask("MockTask2");
     when(reconOmTaskMock.reprocess(omMetadataManagerMock))
-        .thenReturn(new ImmutablePair<>("MockTask2", true));
+        .thenReturn(new 
ReconOmTask.TaskResult.Builder().setTaskName("MockTask2").setTaskSuccess(true).build());
     when(omMetadataManagerMock.getLastSequenceNumberFromDB()
     ).thenReturn(100L);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to