This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 40951a4b7e HDDS-10634. Recon - listKeys API for listing keys with 
optional filters (#6658)
40951a4b7e is described below

commit 40951a4b7e405b299e2df3907ebf3075af6a73ce
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Sun May 26 13:26:57 2024 +0530

    HDDS-10634. Recon - listKeys API for listing keys with optional filters 
(#6658)
---
 .../apache/hadoop/ozone/recon/ReconConstants.java  |    1 +
 .../hadoop/ozone/recon/ReconResponseUtils.java     |   84 ++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  108 ++
 .../hadoop/ozone/recon/api/NSSummaryEndpoint.java  |   25 +-
 .../ozone/recon/api/OMDBInsightEndpoint.java       |  655 +++++++++++-
 .../ozone/recon/api/handlers/EntityHandler.java    |    2 +-
 .../ozone/recon/api/handlers/FSOBucketHandler.java |    1 -
 .../ozone/recon/api/types/KeyEntityInfo.java       |   42 +-
 .../ozone/recon/api/types/ListKeysResponse.java    |  114 ++
 .../hadoop/ozone/recon/api/types/ParamInfo.java    |  133 +++
 .../ozone/recon/OMMetadataManagerTestUtils.java    |   57 +-
 .../ozone/recon/api/TestOmDBInsightEndPoint.java   | 1116 +++++++++++++++++++-
 12 files changed, 2304 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 9c79a869c4..6dbc4746ac 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -44,6 +44,7 @@ public final class ReconConstants {
   public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
   public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
   public static final String DEFAULT_FETCH_COUNT = "1000";
+  public static final String DEFAULT_KEY_SIZE = "0";
   public static final String DEFAULT_BATCH_NUMBER = "1";
   public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
   public static final String RECON_QUERY_PREVKEY = "prevKey";
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
new file mode 100644
index 0000000000..41235ae542
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon;
+
+import com.google.inject.Singleton;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Recon API Response Utility class.
+ */
+@Singleton
+public final class ReconResponseUtils {
+
+  // Declared a private constructor to avoid checkstyle issues.
+  private ReconResponseUtils() {
+
+  }
+
+  /**
+   * Returns a response indicating that no keys matched the search prefix.
+   *
+   * @param startPrefix The search prefix that was used.
+   * @return The response indicating that no keys matched the search prefix.
+   */
+  public static Response noMatchedKeysResponse(String startPrefix) {
+    String jsonResponse = String.format(
+        "{\"message\": \"No keys matched the search prefix: '%s'.\"}",
+        startPrefix);
+    return Response.status(Response.Status.NOT_FOUND)
+        .entity(jsonResponse)
+        .type(MediaType.APPLICATION_JSON)
+        .build();
+  }
+
+  /**
+   * Utility method to create a bad request response with a custom message.
+   * Which means the request sent by the client to the server is incorrect
+   * or malformed and cannot be processed by the server.
+   *
+   * @param message The message to include in the response body.
+   * @return A Response object configured with the provided message.
+   */
+  public static Response createBadRequestResponse(String message) {
+    String jsonResponse = String.format("{\"message\": \"%s\"}", message);
+    return Response.status(Response.Status.BAD_REQUEST)
+        .entity(jsonResponse)
+        .type(MediaType.APPLICATION_JSON)
+        .build();
+  }
+
+  /**
+   * Utility method to create an internal server error response with a custom 
message.
+   * Which means the server encountered an unexpected condition that prevented 
it
+   * from fulfilling the request.
+   *
+   * @param message The message to include in the response body.
+   * @return A Response object configured with the provided message.
+   */
+  public static Response createInternalServerErrorResponse(String message) {
+    String jsonResponse = String.format("{\"message\": \"%s\"}", message);
+    return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+        .entity(jsonResponse)
+        .type(MediaType.APPLICATION_JSON)
+        .build();
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 76b601b1c0..fe920b7098 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -29,8 +29,13 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +44,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -65,6 +71,8 @@ import static org.jooq.impl.DSL.select;
 import static org.jooq.impl.DSL.using;
 
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
 import org.apache.hadoop.ozone.recon.api.types.DUResponse;
@@ -80,6 +88,8 @@ import org.jooq.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
+
 /**
  * Recon Utility class.
  */
@@ -509,4 +519,102 @@ public class ReconUtils {
   public static void setLogger(Logger logger) {
     log = logger;
   }
+
+  /**
+   * Return if all OMDB tables that will be used are initialized.
+   * @return if tables are initialized
+   */
+  public static boolean isInitializationComplete(ReconOMMetadataManager 
omMetadataManager) {
+    if (omMetadataManager == null) {
+      return false;
+    }
+    return omMetadataManager.getVolumeTable() != null
+        && omMetadataManager.getBucketTable() != null
+        && omMetadataManager.getDirectoryTable() != null
+        && omMetadataManager.getFileTable() != null
+        && omMetadataManager.getKeyTable(BucketLayout.LEGACY) != null;
+  }
+
+  /**
+   * Converts string date in a provided format to server timezone's epoch 
milllioseconds.
+   *
+   * @param dateString
+   * @param dateFormat
+   * @param timeZone
+   * @return the epoch milliseconds representation of the date.
+   * @throws ParseException
+   */
+  public static long convertToEpochMillis(String dateString, String 
dateFormat, TimeZone timeZone) {
+    String localDateFormat = dateFormat;
+    try {
+      if (StringUtils.isEmpty(dateString)) {
+        return Instant.now().toEpochMilli();
+      }
+      if (StringUtils.isEmpty(dateFormat)) {
+        localDateFormat = "MM-dd-yyyy HH:mm:ss";
+      }
+      if (null == timeZone) {
+        timeZone = TimeZone.getDefault();
+      }
+      SimpleDateFormat sdf = new SimpleDateFormat(localDateFormat);
+      sdf.setTimeZone(timeZone); // Set server's timezone
+      Date date = sdf.parse(dateString);
+      return date.getTime(); // Convert to epoch milliseconds
+    } catch (ParseException parseException) {
+      log.error("Date parse exception for date: {} in format: {} -> {}", 
dateString, localDateFormat, parseException);
+      return Instant.now().toEpochMilli();
+    } catch (Exception exception) {
+      log.error("Unexpected error while parsing date: {} in format: {} -> {}", 
dateString, localDateFormat, exception);
+      return Instant.now().toEpochMilli();
+    }
+  }
+
+  /**
+   * Validates volume or bucket names according to specific rules.
+   *
+   * @param resName The name to validate (volume or bucket).
+   * @return A Response object if validation fails, or null if the name is 
valid.
+   */
+  public static Response validateNames(String resName)
+      throws IllegalArgumentException {
+    if (resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH ||
+        resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name length should be between " +
+              OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH + " and " +
+              OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH);
+    }
+
+    if (resName.charAt(0) == '.' || resName.charAt(0) == '-' ||
+        resName.charAt(resName.length() - 1) == '.' ||
+        resName.charAt(resName.length() - 1) == '-') {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name cannot start or end with " +
+              "hyphen or period");
+    }
+
+    // Regex to check for lowercase letters, numbers, hyphens, underscores, 
and periods only.
+    if (!resName.matches("^[a-z0-9._-]+$")) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name can only contain lowercase " +
+              "letters, numbers, hyphens, underscores, and periods");
+    }
+
+    // If all checks pass, the name is valid
+    return null;
+  }
+
+  /**
+   * Constructs an object path with the given IDs.
+   *
+   * @param ids The IDs to construct the object path with.
+   * @return The constructed object path.
+   */
+  public static String constructObjectPathWithPrefix(long... ids) {
+    StringBuilder pathBuilder = new StringBuilder();
+    for (long id : ids) {
+      pathBuilder.append(OM_KEY_PREFIX).append(id);
+    }
+    return pathBuilder.toString();
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
index 71040b9fdf..35901195dc 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NSSummaryEndpoint.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.ozone.recon.api;
 
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.handlers.EntityHandler;
 import org.apache.hadoop.ozone.recon.api.types.NamespaceSummaryResponse;
 import org.apache.hadoop.ozone.recon.api.types.DUResponse;
@@ -78,7 +78,7 @@ public class NSSummaryEndpoint {
     }
 
     NamespaceSummaryResponse namespaceSummaryResponse;
-    if (!isInitializationComplete()) {
+    if (!ReconUtils.isInitializationComplete(omMetadataManager)) {
       namespaceSummaryResponse =
           NamespaceSummaryResponse.newBuilder()
               .setEntityType(EntityType.UNKNOWN)
@@ -119,7 +119,7 @@ public class NSSummaryEndpoint {
     }
 
     DUResponse duResponse = new DUResponse();
-    if (!isInitializationComplete()) {
+    if (!ReconUtils.isInitializationComplete(omMetadataManager)) {
       duResponse.setStatus(ResponseStatus.INITIALIZING);
       return Response.ok(duResponse).build();
     }
@@ -150,7 +150,7 @@ public class NSSummaryEndpoint {
     }
 
     QuotaUsageResponse quotaUsageResponse = new QuotaUsageResponse();
-    if (!isInitializationComplete()) {
+    if (!ReconUtils.isInitializationComplete(omMetadataManager)) {
       quotaUsageResponse.setResponseCode(ResponseStatus.INITIALIZING);
       return Response.ok(quotaUsageResponse).build();
     }
@@ -181,7 +181,7 @@ public class NSSummaryEndpoint {
 
     FileSizeDistributionResponse distResponse =
         new FileSizeDistributionResponse();
-    if (!isInitializationComplete()) {
+    if (!ReconUtils.isInitializationComplete(omMetadataManager)) {
       distResponse.setStatus(ResponseStatus.INITIALIZING);
       return Response.ok(distResponse).build();
     }
@@ -195,19 +195,4 @@ public class NSSummaryEndpoint {
     return Response.ok(distResponse).build();
   }
 
-  /**
-   * Return if all OMDB tables that will be used are initialized.
-   * @return if tables are initialized
-   */
-  private boolean isInitializationComplete() {
-    if (omMetadataManager == null) {
-      return false;
-    }
-    return omMetadataManager.getVolumeTable() != null
-        && omMetadataManager.getBucketTable() != null
-        && omMetadataManager.getDirectoryTable() != null
-        && omMetadataManager.getFileTable() != null
-        && omMetadataManager.getKeyTable(BucketLayout.LEGACY) != null;
-  }
-
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index baa9c522be..f4aaf50dfc 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -24,14 +24,20 @@ import 
org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.recon.ReconResponseUtils;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler;
 import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo;
 import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.api.types.ListKeysResponse;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.api.types.ParamInfo;
+import org.apache.hadoop.ozone.recon.api.types.ResponseStatus;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
-import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
 import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
 import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
@@ -49,10 +55,17 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
@@ -60,12 +73,16 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
 import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_KEY_SIZE;
 import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
 import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_NON_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_NON_FSO;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath;
+import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath;
 
 
 /**
@@ -83,16 +100,12 @@ import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUD
 @AdminOnly
 public class OMDBInsightEndpoint {
 
-  @Inject
-  private ContainerEndpoint containerEndpoint;
-  @Inject
-  private ReconContainerMetadataManager reconContainerMetadataManager;
   private final ReconOMMetadataManager omMetadataManager;
-  private final ReconContainerManager containerManager;
   private static final Logger LOG =
       LoggerFactory.getLogger(OMDBInsightEndpoint.class);
   private final GlobalStatsDao globalStatsDao;
   private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager;
+  private final OzoneStorageContainerManager reconSCM;
 
 
   @Inject
@@ -101,11 +114,10 @@ public class OMDBInsightEndpoint {
                              GlobalStatsDao globalStatsDao,
                              ReconNamespaceSummaryManagerImpl
                                  reconNamespaceSummaryManager) {
-    this.containerManager =
-        (ReconContainerManager) reconSCM.getContainerManager();
     this.omMetadataManager = omMetadataManager;
     this.globalStatsDao = globalStatsDao;
     this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
+    this.reconSCM = reconSCM;
   }
 
   /**
@@ -674,6 +686,631 @@ public class OMDBInsightEndpoint {
     return Response.ok(dirSummary).build();
   }
 
+  /**
+   * This API will list out limited 'count' number of keys after applying 
below filters in API parameters:
+   * Default Values of API param filters:
+   *    -- replicationType - empty string and filter will not be applied, so 
list out all keys irrespective of
+   *       replication type.
+   *    -- creationTime - empty string and filter will not be applied, so list 
out keys irrespective of age.
+   *    -- keySize - 0 bytes, which means all keys greater than zero bytes 
will be listed, effectively all.
+   *    -- startPrefix - /, API assumes that startPrefix path always starts 
with /. E.g. /volume/bucket
+   *    -- prevKey - ""
+   *    -- limit - 1000
+   *
+   * @param replicationType Filter for RATIS or EC replication keys
+   * @param creationDate Filter for keys created after creationDate in 
"MM-dd-yyyy HH:mm:ss" string format.
+   * @param keySize Filter for Keys greater than keySize in bytes.
+   * @param startPrefix Filter for startPrefix path.
+   * @param prevKey rocksDB last key of page requested.
+   * @param limit Filter for limited count of keys.
+   *
+   * @return the list of keys in JSON structured format as per respective 
bucket layout.
+   *
+   * Now lets consider, we have following OBS, LEGACY and FSO bucket key/files 
namespace tree structure
+   *
+   * For OBS Bucket
+   *
+   * /volume1/obs-bucket/key1
+   * /volume1/obs-bucket/key1/key2
+   * /volume1/obs-bucket/key1/key2/key3
+   * /volume1/obs-bucket/key4
+   * /volume1/obs-bucket/key5
+   * /volume1/obs-bucket/key6
+   * For LEGACY Bucket
+   *
+   * /volume1/legacy-bucket/key
+   * /volume1/legacy-bucket/key1/key2
+   * /volume1/legacy-bucket/key1/key2/key3
+   * /volume1/legacy-bucket/key4
+   * /volume1/legacy-bucket/key5
+   * /volume1/legacy-bucket/key6
+   * For FSO Bucket
+   *
+   * /volume1/fso-bucket/dir1/dir2/dir3
+   * /volume1/fso-bucket/dir1/testfile
+   * /volume1/fso-bucket/dir1/file1
+   * /volume1/fso-bucket/dir1/dir2/testfile
+   * /volume1/fso-bucket/dir1/dir2/file1
+   * /volume1/fso-bucket/dir1/dir2/dir3/testfile
+   * /volume1/fso-bucket/dir1/dir2/dir3/file1
+   * Input Request for OBS bucket:
+   *
+   *    
`api/v1/keys/listKeys?startPrefix=/volume1/obs-bucket&limit=2&replicationType=RATIS`
+   * Output Response:
+   *
+   * {
+   *     "status": "OK",
+   *     "path": "/volume1/obs-bucket",
+   *     "replicatedDataSize": 62914560,
+   *     "unReplicatedDataSize": 62914560,
+   *     "lastKey": "/volume1/obs-bucket/key6",
+   *     "keys": [
+   *         {
+   *             "key": "/volume1/obs-bucket/key1",
+   *             "path": "volume1/obs-bucket/key1",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781418742,
+   *             "modificationTime": 1715781419762,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": "/volume1/obs-bucket/key1/key2",
+   *             "path": "volume1/obs-bucket/key1/key2",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781421716,
+   *             "modificationTime": 1715781422723,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": "/volume1/obs-bucket/key1/key2/key3",
+   *             "path": "volume1/obs-bucket/key1/key2/key3",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781424718,
+   *             "modificationTime": 1715781425598,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": "/volume1/obs-bucket/key4",
+   *             "path": "volume1/obs-bucket/key4",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781427561,
+   *             "modificationTime": 1715781428407,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": "/volume1/obs-bucket/key5",
+   *             "path": "volume1/obs-bucket/key5",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781430347,
+   *             "modificationTime": 1715781431185,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": "/volume1/obs-bucket/key6",
+   *             "path": "volume1/obs-bucket/key6",
+   *             "size": 10485760,
+   *             "replicatedSize": 10485760,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "ONE",
+   *                 "requiredNodes": 1,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781433154,
+   *             "modificationTime": 1715781433962,
+   *             "isKey": true
+   *         }
+   *     ]
+   * }
+   * Input Request for FSO bucket:
+   *
+   *        
`api/v1/keys/listKeys?startPrefix=/volume1/fso-bucket&limit=2&replicationType=RATIS`
+   * Output Response:
+   *
+   * {
+   *     "status": "OK",
+   *     "path": "/volume1/fso-bucket",
+   *     "replicatedDataSize": 188743680,
+   *     "unReplicatedDataSize": 62914560,
+   *     "lastKey": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773503/testfile",
+   *     "keys": [
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773501/file1",
+   *             "path": "volume1/fso-bucket/dir1/dir2/dir3/file1",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781411785,
+   *             "modificationTime": 1715781415119,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773501/testfile",
+   *             "path": "volume1/fso-bucket/dir1/dir2/dir3/testfile",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781409146,
+   *             "modificationTime": 1715781409882,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773502/file1",
+   *             "path": "volume1/fso-bucket/dir1/dir2/file1",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781406333,
+   *             "modificationTime": 1715781407140,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773502/testfile",
+   *             "path": "volume1/fso-bucket/dir1/dir2/testfile",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781403655,
+   *             "modificationTime": 1715781404460,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773503/file1",
+   *             "path": "volume1/fso-bucket/dir1/file1",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781400980,
+   *             "modificationTime": 1715781401768,
+   *             "isKey": true
+   *         },
+   *         {
+   *             "key": 
"/-9223372036854775552/-9223372036854774016/-9223372036854773503/testfile",
+   *             "path": "volume1/fso-bucket/dir1/testfile",
+   *             "size": 10485760,
+   *             "replicatedSize": 31457280,
+   *             "replicationInfo": {
+   *                 "replicationFactor": "THREE",
+   *                 "requiredNodes": 3,
+   *                 "replicationType": "RATIS"
+   *             },
+   *             "creationTime": 1715781397636,
+   *             "modificationTime": 1715781398919,
+   *             "isKey": true
+   *         }
+   *     ]
+   * }
+   *
+   * ********************************************************
+   * @throws IOException
+   */
+  @GET
+  @Path("/listKeys")
+  @SuppressWarnings("methodlength")
+  public Response listKeys(@QueryParam("replicationType") String 
replicationType,
+                           @QueryParam("creationDate") String creationDate,
+                           @DefaultValue(DEFAULT_KEY_SIZE) 
@QueryParam("keySize") long keySize,
+                           @DefaultValue(OM_KEY_PREFIX) 
@QueryParam("startPrefix") String startPrefix,
+                           @DefaultValue(StringUtils.EMPTY) 
@QueryParam(RECON_QUERY_PREVKEY) String prevKey,
+                           @DefaultValue(DEFAULT_FETCH_COUNT) 
@QueryParam("limit") int limit) {
+
+
+    // This API supports startPrefix from bucket level.
+    if (startPrefix == null || startPrefix.length() == 0) {
+      return Response.status(Response.Status.BAD_REQUEST).build();
+    }
+    String[] names = startPrefix.split(OM_KEY_PREFIX);
+    if (names.length < 3) {
+      return Response.status(Response.Status.BAD_REQUEST).build();
+    }
+    ListKeysResponse listKeysResponse = new ListKeysResponse();
+    if (!ReconUtils.isInitializationComplete(omMetadataManager)) {
+      listKeysResponse.setStatus(ResponseStatus.INITIALIZING);
+      return Response.ok(listKeysResponse).build();
+    }
+    ParamInfo paramInfo = new ParamInfo(replicationType, creationDate, 
keySize, startPrefix, prevKey,
+        limit, false, "");
+    Response response = getListKeysResponse(paramInfo);
+    if ((response.getStatus() != Response.Status.OK.getStatusCode()) &&
+        (response.getStatus() != Response.Status.NOT_FOUND.getStatusCode())) {
+      return response;
+    }
+    if (response.getEntity() instanceof ListKeysResponse) {
+      listKeysResponse = (ListKeysResponse) response.getEntity();
+    }
+
+    List<KeyEntityInfo> keyInfoList = listKeysResponse.getKeys();
+    if (!keyInfoList.isEmpty()) {
+      listKeysResponse.setLastKey(keyInfoList.get(keyInfoList.size() - 
1).getKey());
+    }
+    return Response.ok(listKeysResponse).build();
+  }
+
+  private Response getListKeysResponse(ParamInfo paramInfo) {
+    try {
+      paramInfo.setLimit(Math.max(0, paramInfo.getLimit())); // Ensure limit 
is non-negative
+      ListKeysResponse listKeysResponse = new ListKeysResponse();
+      listKeysResponse.setPath(paramInfo.getStartPrefix());
+      long replicatedTotal = 0;
+      long unreplicatedTotal = 0;
+      boolean keysFound = false; // Flag to track if any keys are found
+
+      // Search keys from non-FSO layout.
+      Map<String, OmKeyInfo> obsKeys;
+      Table<String, OmKeyInfo> keyTable =
+          omMetadataManager.getKeyTable(BucketLayout.LEGACY);
+      obsKeys = retrieveKeysFromTable(keyTable, paramInfo);
+      for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) {
+        keysFound = true;
+        KeyEntityInfo keyEntityInfo =
+            createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
+
+        listKeysResponse.getKeys().add(keyEntityInfo);
+        replicatedTotal += entry.getValue().getReplicatedSize();
+        unreplicatedTotal += entry.getValue().getDataSize();
+      }
+
+      // Search keys from FSO layout.
+      Map<String, OmKeyInfo> fsoKeys = searchKeysInFSO(paramInfo);
+      for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) {
+        keysFound = true;
+        KeyEntityInfo keyEntityInfo =
+            createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
+
+        listKeysResponse.getKeys().add(keyEntityInfo);
+        replicatedTotal += entry.getValue().getReplicatedSize();
+        unreplicatedTotal += entry.getValue().getDataSize();
+      }
+
+      // If no keys were found, return a response indicating that no keys 
matched
+      if (!keysFound) {
+        return 
ReconResponseUtils.noMatchedKeysResponse(paramInfo.getStartPrefix());
+      }
+
+      // Set the aggregated totals in the response
+      listKeysResponse.setReplicatedDataSize(replicatedTotal);
+      listKeysResponse.setUnReplicatedDataSize(unreplicatedTotal);
+
+      return Response.ok(listKeysResponse).build();
+    } catch (IOException e) {
+      return ReconResponseUtils.createInternalServerErrorResponse(
+          "Error listing keys from OM DB: " + e.getMessage());
+    } catch (RuntimeException e) {
+      return ReconResponseUtils.createInternalServerErrorResponse(
+          "Unexpected runtime error while searching keys in OM DB: " + 
e.getMessage());
+    } catch (Exception e) {
+      return ReconResponseUtils.createInternalServerErrorResponse(
+          "Error listing keys from OM DB: " + e.getMessage());
+    }
+  }
+
+  public Map<String, OmKeyInfo> searchKeysInFSO(ParamInfo paramInfo)
+      throws IOException {
+    int originalLimit = paramInfo.getLimit();
+    Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
+    // Convert the search prefix to an object path for FSO buckets
+    String startPrefixObjectPath = 
convertStartPrefixPathToObjectIdPath(paramInfo.getStartPrefix());
+    String[] names = parseRequestPath(startPrefixObjectPath);
+    Table<String, OmKeyInfo> fileTable =
+        omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    // If names.length > 2, then the search prefix is at the level above 
bucket level hence
+    // no need to find parent or extract id's or find subpaths as the 
fileTable is
+    // suitable for volume and bucket level search
+    if (names.length > 2) {
+      // Fetch the parent ID to search for
+      long parentId = Long.parseLong(names[names.length - 1]);
+
+      // Fetch the nameSpaceSummary for the parent ID
+      NSSummary parentSummary =
+          reconNamespaceSummaryManager.getNSSummary(parentId);
+      if (parentSummary == null) {
+        return matchedKeys;
+      }
+      List<String> subPaths = new ArrayList<>();
+      // Add the initial search prefix object path because it can have both 
files and subdirectories with files.
+      subPaths.add(startPrefixObjectPath);
+
+      // Recursively gather all subpaths
+      gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), 
Long.parseLong(names[1]));
+      // Iterate over the subpaths and retrieve the files
+      for (String subPath : subPaths) {
+        paramInfo.setStartPrefix(subPath);
+        matchedKeys.putAll(
+            retrieveKeysFromTable(fileTable, paramInfo));
+        paramInfo.setLimit(originalLimit - matchedKeys.size());
+        if (matchedKeys.size() >= originalLimit) {
+          break;
+        }
+      }
+      return matchedKeys;
+    }
+
+    paramInfo.setStartPrefix(startPrefixObjectPath);
+    // Iterate over for bucket and volume level search
+    matchedKeys.putAll(
+        retrieveKeysFromTable(fileTable, paramInfo));
+    return matchedKeys;
+  }
+
+  /**
+   * Finds all subdirectories under a parent directory in an FSO bucket. It 
builds
+   * a list of paths for these subdirectories. These sub-directories are then 
used
+   * to search for files in the fileTable.
+   * <p>
+   * How it works:
+   * - Starts from a parent directory identified by parentId.
+   * - Looks through all child directories of this parent.
+   * - For each child, it creates a path that starts with 
volumeID/bucketID/parentId,
+   * following our fileTable format
+   * - Adds these paths to a list and explores each child further for more 
subdirectories.
+   *
+   * @param parentId The ID of the directory we start exploring from.
+   * @param subPaths A list where we collect paths to all subdirectories.
+   * @param volumeID
+   * @param bucketID
+   * @throws IOException If there are problems accessing directory information.
+   */
+  private void gatherSubPaths(long parentId, List<String> subPaths,
+                              long volumeID, long bucketID) throws IOException 
{
+    // Fetch the NSSummary object for parentId
+    NSSummary parentSummary =
+        reconNamespaceSummaryManager.getNSSummary(parentId);
+    if (parentSummary == null) {
+      return;
+    }
+
+    Set<Long> childDirIds = parentSummary.getChildDir();
+    for (Long childId : childDirIds) {
+      // Fetch the NSSummary for each child directory
+      NSSummary childSummary =
+          reconNamespaceSummaryManager.getNSSummary(childId);
+      if (childSummary != null) {
+        String subPath =
+            ReconUtils.constructObjectPathWithPrefix(volumeID, bucketID, 
childId);
+        // Add to subPaths
+        subPaths.add(subPath);
+        // Recurse into this child directory
+        gatherSubPaths(childId, subPaths, volumeID, bucketID);
+      }
+    }
+  }
+
+
+  /**
+   * Converts a startPrefix path into an objectId path for FSO buckets, using 
IDs.
+   * <p>
+   * This method transforms a user-provided path (e.g., "volume/bucket/dir1") 
into
+   * a database-friendly format ("/volumeID/bucketID/ParentId/") by replacing 
names
+   * with their corresponding IDs. It simplifies database queries for FSO 
bucket operations.
+   *
+   * @param startPrefixPath The path to be converted.
+   * @return The objectId path as "/volumeID/bucketID/ParentId/".
+   * @throws IOException If database access fails.
+   */
+  public String convertStartPrefixPathToObjectIdPath(String startPrefixPath)
+      throws IOException {
+
+    String[] names = parseRequestPath(
+        normalizePath(startPrefixPath, BucketLayout.FILE_SYSTEM_OPTIMIZED));
+
+    // Root-Level :- Return the original path
+    if (names.length == 0) {
+      return startPrefixPath;
+    }
+
+    // Volume-Level :- Fetch the volumeID
+    String volumeName = names[0];
+    ReconUtils.validateNames(volumeName);
+    String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+    long volumeId = omMetadataManager.getVolumeTable().getSkipCache(volumeKey)
+        .getObjectID();
+    if (names.length == 1) {
+      return ReconUtils.constructObjectPathWithPrefix(volumeId);
+    }
+
+    // Bucket-Level :- Fetch the bucketID
+    String bucketName = names[1];
+    ReconUtils.validateNames(bucketName);
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo bucketInfo =
+        omMetadataManager.getBucketTable().getSkipCache(bucketKey);
+    long bucketId = bucketInfo.getObjectID();
+    if (names.length == 2) {
+      return ReconUtils.constructObjectPathWithPrefix(volumeId, bucketId);
+    }
+
+    // Fetch the immediate parentID which could be a directory or the bucket 
itself
+    BucketHandler handler =
+        getBucketHandler(reconNamespaceSummaryManager, omMetadataManager,
+            reconSCM, bucketInfo);
+    long dirObjectId = -1;
+    try {
+      OmDirectoryInfo dirInfo = handler.getDirInfo(names);
+      if (null != dirInfo) {
+        dirObjectId = dirInfo.getObjectID();
+      } else {
+        throw new IllegalArgumentException("Not valid path");
+      }
+    } catch (Exception ioe) {
+      throw new IllegalArgumentException("Not valid path: " + ioe);
+    }
+    return ReconUtils.constructObjectPathWithPrefix(volumeId, bucketId, 
dirObjectId);
+  }
+
+  /**
+   * Common method to retrieve keys from a table based on a search prefix and 
a limit.
+   *
+   * @param table     The table to retrieve keys from.
+   * @param paramInfo The stats object holds total count, count and limit.
+   * @return A map of keys and their corresponding OmKeyInfo objects.
+   * @throws IOException If there are problems accessing the table.
+   */
+  private Map<String, OmKeyInfo> retrieveKeysFromTable(
+      Table<String, OmKeyInfo> table, ParamInfo paramInfo)
+      throws IOException {
+    boolean skipPrevKey = false;
+    String seekKey = paramInfo.getPrevKey();
+    Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
+    try (
+        TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = table.iterator()) {
+
+      if (!paramInfo.isSkipPrevKeyDone() && StringUtils.isNotBlank(seekKey)) {
+        skipPrevKey = true;
+        Table.KeyValue<String, OmKeyInfo> seekKeyValue =
+            keyIter.seek(seekKey);
+
+        // check if RocksDB was able to seek correctly to the given key prefix
+        // if not, then return empty result
+        // In case of an empty prevKeyPrefix, all the keys are returned
+        if (seekKeyValue == null || 
(!seekKeyValue.getKey().equals(paramInfo.getPrevKey()))) {
+          return matchedKeys;
+        }
+      } else {
+        keyIter.seek(paramInfo.getStartPrefix());
+      }
+
+      while (keyIter.hasNext()) {
+        Table.KeyValue<String, OmKeyInfo> entry = keyIter.next();
+        String dbKey = entry.getKey();
+        if (!dbKey.startsWith(paramInfo.getStartPrefix())) {
+          break; // Exit the loop if the key no longer matches the prefix
+        }
+        if (skipPrevKey && dbKey.equals(paramInfo.getPrevKey())) {
+          paramInfo.setSkipPrevKeyDone(true);
+          continue;
+        }
+        if (applyFilters(entry, paramInfo)) {
+          matchedKeys.put(dbKey, entry.getValue());
+          paramInfo.setLastKey(dbKey);
+          if (matchedKeys.size() >= paramInfo.getLimit()) {
+            break;
+          }
+        }
+      }
+    } catch (IOException exception) {
+      LOG.error("Error retrieving keys from table for path: {}", 
paramInfo.getStartPrefix(), exception);
+      throw exception;
+    }
+    return matchedKeys;
+  }
+
+  private boolean applyFilters(Table.KeyValue<String, OmKeyInfo> entry, 
ParamInfo paramInfo) throws IOException {
+
+    LOG.debug("Applying filters on : {}", entry.getKey());
+
+    long epochMillis =
+        ReconUtils.convertToEpochMillis(paramInfo.getCreationDate(), 
"MM-dd-yyyy HH:mm:ss", TimeZone.getDefault());
+    Predicate<Table.KeyValue<String, OmKeyInfo>> keyAgeFilter = keyData -> {
+      try {
+        return keyData.getValue().getCreationTime() >= epochMillis;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+    Predicate<Table.KeyValue<String, OmKeyInfo>> keyReplicationFilter =
+        keyData -> {
+          try {
+            return 
keyData.getValue().getReplicationConfig().getReplicationType().name()
+                .equals(paramInfo.getReplicationType());
+          } catch (IOException e) {
+            try {
+              throw new IOException(e);
+            } catch (IOException ex) {
+              throw new RuntimeException(ex);
+            }
+          }
+        };
+    Predicate<Table.KeyValue<String, OmKeyInfo>> keySizeFilter = keyData -> {
+      try {
+        return keyData.getValue().getDataSize() >= paramInfo.getKeySize();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+    List<Table.KeyValue<String, OmKeyInfo>> filteredKeyList = Stream.of(entry)
+        .filter(keyData -> !StringUtils.isEmpty(paramInfo.getCreationDate()) ? 
keyAgeFilter.test(keyData) : true)
+        .filter(
+            keyData -> !StringUtils.isEmpty(paramInfo.getReplicationType()) ? 
keyReplicationFilter.test(keyData) : true)
+        .filter(keySizeFilter)
+        .collect(Collectors.toList());
+
+    LOG.debug("After applying filter on : {}, filtered list size: {}", 
entry.getKey(), filteredKeyList.size());
+
+    return (filteredKeyList.size() > 0);
+  }
+
+  /**
+   * Creates a KeyEntityInfo object from an OmKeyInfo object and the 
corresponding key.
+   *
+   * @param dbKey   The key in the database corresponding to the OmKeyInfo 
object.
+   * @param keyInfo The OmKeyInfo object to create the KeyEntityInfo from.
+   * @return The KeyEntityInfo object created from the OmKeyInfo object and 
the key.
+   */
+  private KeyEntityInfo createKeyEntityInfoFromOmKeyInfo(String dbKey,
+                                                         OmKeyInfo keyInfo) 
throws IOException {
+    KeyEntityInfo keyEntityInfo = new KeyEntityInfo();
+    keyEntityInfo.setKey(dbKey); // Set the DB key
+    keyEntityInfo.setPath(ReconUtils.constructFullPath(keyInfo, 
reconNamespaceSummaryManager,
+        omMetadataManager));
+    keyEntityInfo.setSize(keyInfo.getDataSize());
+    keyEntityInfo.setCreationTime(keyInfo.getCreationTime());
+    keyEntityInfo.setModificationTime(keyInfo.getModificationTime());
+    keyEntityInfo.setReplicatedSize(keyInfo.getReplicatedSize());
+    keyEntityInfo.setReplicationConfig(keyInfo.getReplicationConfig());
+    return keyEntityInfo;
+  }
+
   private void createSummaryForDeletedDirectories(
       Map<String, Long> dirSummary) {
     // Fetch the necessary metrics for deleted directories.
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/EntityHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/EntityHandler.java
index f2bcb58d35..e3ecbffa71 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/EntityHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/EntityHandler.java
@@ -322,7 +322,7 @@ public abstract class EntityHandler {
    * @param bucketLayout
    * @return A normalized path
    */
-  private static String normalizePath(String path, BucketLayout bucketLayout) {
+  public static String normalizePath(String path, BucketLayout bucketLayout) {
     if (bucketLayout == BucketLayout.OBJECT_STORE) {
       return OM_KEY_PREFIX + OmUtils.normalizePathUptoBucket(path);
     }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/FSOBucketHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/FSOBucketHandler.java
index 8a1c5babe7..4ae18605b0 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/FSOBucketHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/FSOBucketHandler.java
@@ -300,5 +300,4 @@ public class FSOBucketHandler extends BucketHandler {
         .getDirectoryTable().getSkipCache(dirKey);
     return dirInfo;
   }
-
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyEntityInfo.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyEntityInfo.java
index cf02c503f4..d7cd359919 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyEntityInfo.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyEntityInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.recon.api.types;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 
@@ -38,6 +39,7 @@ public class KeyEntityInfo {
   private String path;
 
   @JsonProperty("inStateSince")
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   private long inStateSince;
 
   @JsonProperty("size")
@@ -49,13 +51,27 @@ public class KeyEntityInfo {
   @JsonProperty("replicationInfo")
   private ReplicationConfig replicationConfig;
 
+  /** key creation time. */
+  @JsonProperty("creationTime")
+  private long creationTime;
+
+  /** key modification time. */
+  @JsonProperty("modificationTime")
+  private long modificationTime;
+
+  /** Indicate if the path is a key for Web UI. */
+  @JsonProperty("isKey")
+  private boolean isKey;
+
   public KeyEntityInfo() {
     key = "";
     path = "";
-    inStateSince = Instant.now().toEpochMilli();
     size = 0L;
     replicatedSize = 0L;
     replicationConfig = null;
+    creationTime = Instant.now().toEpochMilli();
+    modificationTime = Instant.now().toEpochMilli();
+    isKey = true;
   }
 
   public String getKey() {
@@ -106,4 +122,28 @@ public class KeyEntityInfo {
       ReplicationConfig replicationConfig) {
     this.replicationConfig = replicationConfig;
   }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public void setCreationTime(long creationTime) {
+    this.creationTime = creationTime;
+  }
+
+  public long getModificationTime() {
+    return modificationTime;
+  }
+
+  public void setModificationTime(long modificationTime) {
+    this.modificationTime = modificationTime;
+  }
+
+  public boolean isKey() {
+    return isKey;
+  }
+
+  public void setKey(boolean key) {
+    isKey = key;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ListKeysResponse.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ListKeysResponse.java
new file mode 100644
index 0000000000..7220060aeb
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ListKeysResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * HTTP Response wrapped for listKeys requests.
+ */
+public class ListKeysResponse {
+  /** Path status. */
+  @JsonProperty("status")
+  private ResponseStatus status;
+
+  /** The current path request. */
+  @JsonProperty("path")
+  private String path;
+
+  /** Amount of data mapped to all keys and files in a cluster across all DNs. 
*/
+  @JsonProperty("replicatedDataSize")
+  private long replicatedDataSize;
+
+  /** Amount of data mapped to all keys and files on a single DN. */
+  @JsonProperty("unReplicatedDataSize")
+  private long unReplicatedDataSize;
+
+  /** last key sent. */
+  @JsonProperty("lastKey")
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  private String lastKey;
+
+  /** list of keys. */
+  @JsonProperty("keys")
+  private List<KeyEntityInfo> keys;
+
+
+  public ListKeysResponse() {
+    this.status = ResponseStatus.OK;
+    this.path = "";
+    this.keys = new ArrayList<>();
+    this.replicatedDataSize = -1L;
+    this.unReplicatedDataSize = -1L;
+    this.lastKey = "";
+  }
+
+  public ResponseStatus getStatus() {
+    return this.status;
+  }
+
+  public void setStatus(ResponseStatus status) {
+    this.status = status;
+  }
+
+  public long getReplicatedDataSize() {
+    return replicatedDataSize;
+  }
+
+  public void setReplicatedDataSize(long replicatedDataSize) {
+    this.replicatedDataSize = replicatedDataSize;
+  }
+
+  public long getUnReplicatedDataSize() {
+    return unReplicatedDataSize;
+  }
+
+  public void setUnReplicatedDataSize(long unReplicatedDataSize) {
+    this.unReplicatedDataSize = unReplicatedDataSize;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public List<KeyEntityInfo> getKeys() {
+    return keys;
+  }
+
+  public void setKeys(List<KeyEntityInfo> keys) {
+    this.keys = keys;
+  }
+
+  public String getLastKey() {
+    return lastKey;
+  }
+
+  public void setLastKey(String lastKey) {
+    this.lastKey = lastKey;
+  }
+
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ParamInfo.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ParamInfo.java
new file mode 100644
index 0000000000..345b042907
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ParamInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon.api.types;
+
+/**
+ * Wrapper object for statistics of records of a page in API response.
+ */
+public class ParamInfo {
+
+  /**
+   * start prefix path to start search.
+   */
+  private String startPrefix;
+
+  /**
+   * key size filter.
+   */
+  private long keySize;
+
+  /**
+   * creation date filter for keys to filter.
+   */
+  private String creationDate;
+
+  /**
+   *
+   */
+  private String replicationType;
+
+  /**
+   * limit the number of records to return in API response.
+   */
+  private int limit;
+
+  private String prevKey;
+
+  private String lastKey;
+
+  private boolean skipPrevKeyDone = false;
+
+  /**
+   * counter to track the number of records added in API response.
+   */
+  private long currentCount;
+
+  @SuppressWarnings("parameternumber")
+  public ParamInfo(String replicationType, String creationDate, long keySize, 
String startPrefix, String prevKey,
+                   int limit, boolean skipPrevKeyDone, String lastKey) {
+    this.replicationType = replicationType;
+    this.creationDate = creationDate;
+    this.keySize = keySize;
+    this.startPrefix = startPrefix;
+    this.prevKey = prevKey;
+    this.limit = limit;
+    this.skipPrevKeyDone = skipPrevKeyDone;
+    this.lastKey = lastKey;
+  }
+
+  public String getStartPrefix() {
+    return startPrefix;
+  }
+
+  public void setStartPrefix(String startPrefix) {
+    this.startPrefix = startPrefix;
+  }
+
+  public long getKeySize() {
+    return keySize;
+  }
+
+  public String getCreationDate() {
+    return creationDate;
+  }
+
+  public String getReplicationType() {
+    return replicationType;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public long getCurrentCount() {
+    return currentCount;
+  }
+
+  public void setCurrentCount(long currentCount) {
+    this.currentCount = currentCount;
+  }
+
+  public String getPrevKey() {
+    return prevKey;
+  }
+
+  public void setPrevKey(String prevKey) {
+    this.prevKey = prevKey;
+  }
+
+  public boolean isSkipPrevKeyDone() {
+    return skipPrevKeyDone;
+  }
+
+  public void setSkipPrevKeyDone(boolean skipPrevKeyDone) {
+    this.skipPrevKeyDone = skipPrevKeyDone;
+  }
+
+  public String getLastKey() {
+    return lastKey;
+  }
+
+  public void setLastKey(String lastKey) {
+    this.lastKey = lastKey;
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
index a9ed342faa..8a48fc3d5c 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
@@ -39,12 +39,14 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -65,6 +67,7 @@ import 
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
  */
 public final class OMMetadataManagerTestUtils {
 
+  public static final String TEST_USER = "TestUser";
   private static OzoneConfiguration configuration;
   private OMMetadataManagerTestUtils() {
   }
@@ -79,6 +82,8 @@ public final class OMMetadataManagerTestUtils {
     OzoneConfiguration omConfiguration = new OzoneConfiguration();
     omConfiguration.set(OZONE_OM_DB_DIRS,
         omDbDir.getAbsolutePath());
+    omConfiguration.set(OMConfigKeys
+        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
     OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
         omConfiguration, null);
 
@@ -86,7 +91,7 @@ public final class OMMetadataManagerTestUtils {
     OmVolumeArgs args =
         OmVolumeArgs.newBuilder()
             .setVolume("sampleVol")
-            .setAdminName("TestUser")
+            .setAdminName(TEST_USER)
             .setOwnerName("TestUser")
             .build();
     omMetadataManager.getVolumeTable().put(volumeKey, args);
@@ -264,6 +269,56 @@ public final class OMMetadataManagerTestUtils {
                     .build());
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private static String getKey(OMMetadataManager omMetadataManager, String 
key, String bucket, String volume,
+                               String fileName, long parentObjectId, long 
bucketObjectId, long volumeObjectId,
+                               BucketLayout bucketLayout) {
+    String omKey;
+    if (bucketLayout.equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+      omKey = omMetadataManager.getOzonePathKey(volumeObjectId,
+          bucketObjectId, parentObjectId, fileName);
+    } else {
+      omKey = omMetadataManager.getOzoneKey(volume, bucket, key);
+    }
+    return omKey;
+  }
+
+  /**
+   * Write a key on OM instance.
+   * @throw IOException while writing.
+   */
+  @SuppressWarnings("checkstyle:parameternumber")
+  public static void writeKeyToOm(OMMetadataManager omMetadataManager,
+                                  String key,
+                                  String bucket,
+                                  String volume,
+                                  String fileName,
+                                  long objectID,
+                                  long parentObjectId,
+                                  long bucketObjectId,
+                                  long volumeObjectId,
+                                  long dataSize,
+                                  BucketLayout bucketLayout,
+                                  ReplicationConfig replicationConfig,
+                                  long creationTime, boolean isFile)
+      throws IOException {
+    String omKey =
+        getKey(omMetadataManager, key, bucket, volume, fileName, 
parentObjectId, bucketObjectId, volumeObjectId,
+            bucketLayout);
+    omMetadataManager.getKeyTable(bucketLayout).put(omKey,
+        new OmKeyInfo.Builder()
+            .setBucketName(bucket)
+            .setVolumeName(volume)
+            .setKeyName(key)
+            .setFile(isFile)
+            .setReplicationConfig(replicationConfig)
+            .setCreationTime(creationTime)
+            .setObjectID(objectID)
+            .setParentObjectID(parentObjectId)
+            .setDataSize(dataSize)
+            .build());
+  }
+
   /**
    * Write an open key to OM instance optimized for File System.
    *
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index d89fdd6660..74c58cd9d3 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -19,20 +19,27 @@
 package org.apache.hadoop.ozone.recon.api;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.recon.ReconTestInjector;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo;
 import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.api.types.ListKeysResponse;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
 import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
@@ -40,10 +47,14 @@ import 
org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.scm.ReconPipelineManager;
 import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import 
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithLegacy;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithOBS;
 import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
 import org.junit.jupiter.api.BeforeEach;
@@ -59,16 +70,21 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Random;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 import java.util.Set;
 import java.util.HashSet;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.TEST_USER;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getBucketLayout;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getOmKeyLocationInfo;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -86,12 +102,169 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
   private OMMetadataManager omMetadataManager;
   private ReconPipelineManager reconPipelineManager;
   private ReconOMMetadataManager reconOMMetadataManager;
+  private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private NSSummaryTaskWithLegacy nSSummaryTaskWithLegacy;
+  private NSSummaryTaskWithOBS nsSummaryTaskWithOBS;
+  private NSSummaryTaskWithFSO nsSummaryTaskWithFSO;
   private OMDBInsightEndpoint omdbInsightEndpoint;
   private Pipeline pipeline;
   private Random random = new Random();
   private OzoneConfiguration ozoneConfiguration;
   private Set<Long> generatedIds = new HashSet<>();
 
+  private static final String VOLUME_ONE = "volume1";
+
+  private static final String OBS_BUCKET = "obs-bucket";
+  private static final String FSO_BUCKET = "fso-bucket";
+  private static final String EMPTY_OBS_BUCKET = "empty-obs-bucket";
+  private static final String EMPTY_FSO_BUCKET = "empty-fso-bucket";
+  private static final String LEGACY_BUCKET = "legacy-bucket";
+  private static final String FSO_BUCKET_TWO = "fso-bucket2";
+
+  private static final String DIR_ONE = "dir1";
+  private static final String DIR_TWO = "dir2";
+  private static final String DIR_THREE = "dir3";
+
+  private static final String DIR_FOUR = "dir4";
+  private static final String DIR_FIVE = "dir5";
+  private static final String DIR_SIX = "dir6";
+  private static final String DIR_SEVEN = "dir7";
+
+  private static final String DIR_EIGHT = "dir8";
+  private static final String DIR_NINE = "dir9";
+  private static final String DIR_TEN = "dir10";
+
+  private static final String TEST_FILE = "testfile";
+  private static final String FILE_ONE = "file1";
+
+  private static final String KEY_ONE = "key1";
+  private static final String KEY_TWO = "key1/key2";
+  private static final String KEY_THREE = "key1/key2/key3";
+  private static final String KEY_FOUR = "key4";
+  private static final String KEY_FIVE = "key5";
+  private static final String KEY_SIX = "key6";
+  private static final String NON_EXISTENT_KEY_SEVEN = "key7";
+
+  private static final String KEY_EIGHT = "key8";
+  private static final String KEY_NINE = "dir4/dir5/key9";
+  private static final String KEY_TEN = "dir4/dir6/key10";
+  private static final String KEY_ELEVEN = "key11";
+  private static final String KEY_TWELVE = "key12";
+  private static final String KEY_THIRTEEN = "dir4/dir7/key13";
+
+  private static final String KEY_FOURTEEN = "dir8/key14";
+  private static final String KEY_FIFTEEN = "dir8/key15";
+  private static final String KEY_SIXTEEN = "dir9/key16";
+  private static final String KEY_SEVENTEEN = "dir9/key17";
+  private static final String KEY_EIGHTEEN = "dir8/key18";
+  private static final String KEY_NINETEEN = "dir8/key19";
+
+  private static final String FILE_EIGHT = "key8";
+  private static final String FILE_NINE = "key9";
+  private static final String FILE_TEN = "key10";
+  private static final String FILE_ELEVEN = "key11";
+  private static final String FILE_TWELVE = "key12";
+  private static final String FILE_THIRTEEN = "key13";
+
+  private static final String FILE_FOURTEEN = "key14";
+  private static final String FILE_FIFTEEN = "key15";
+  private static final String FILE_SIXTEEN = "key16";
+  private static final String FILE_SEVENTEEN = "key17";
+  private static final String FILE_EIGHTEEN = "key18";
+  private static final String FILE_NINETEEN = "key19";
+
+  private static final long PARENT_OBJECT_ID_ZERO = 0L;
+  private static final long VOLUME_ONE_OBJECT_ID = 1L;
+
+  private static final long OBS_BUCKET_OBJECT_ID = 2L;
+  private static final long KEY_ONE_OBJECT_ID = 3L;
+  private static final long KEY_TWO_OBJECT_ID = 4L;
+  private static final long KEY_THREE_OBJECT_ID = 5L;
+  private static final long KEY_FOUR_OBJECT_ID = 6L;
+  private static final long KEY_FIVE_OBJECT_ID = 7L;
+  private static final long KEY_SIX_OBJECT_ID = 8L;
+
+  private static final long FSO_BUCKET_OBJECT_ID = 10L;
+  private static final long DIR_ONE_OBJECT_ID = 11L;
+  private static final long DIR_TWO_OBJECT_ID = 12L;
+  private static final long DIR_THREE_OBJECT_ID = 13L;
+  private static final long KEY_SEVEN_OBJECT_ID = 14L;
+  private static final long KEY_EIGHT_OBJECT_ID = 15L;
+  private static final long KEY_NINE_OBJECT_ID = 16L;
+  private static final long KEY_TEN_OBJECT_ID = 17L;
+  private static final long KEY_ELEVEN_OBJECT_ID = 18L;
+  private static final long KEY_TWELVE_OBJECT_ID = 19L;
+
+  private static final long LEGACY_BUCKET_OBJECT_ID = 20L;
+  private static final long DIR_FOUR_OBJECT_ID = 21L;
+  private static final long DIR_FIVE_OBJECT_ID = 22L;
+  private static final long DIR_SIX_OBJECT_ID = 23L;
+  private static final long KEY_THIRTEEN_OBJECT_ID = 24L;
+  private static final long KEY_FOURTEEN_OBJECT_ID = 25L;
+  private static final long KEY_FIFTEEN_OBJECT_ID = 26L;
+  private static final long KEY_SIXTEEN_OBJECT_ID = 27L;
+  private static final long KEY_SEVENTEEN_OBJECT_ID = 28L;
+  private static final long KEY_EIGHTEEN_OBJECT_ID = 29L;
+
+  private static final long FSO_BUCKET_TWO_OBJECT_ID = 30L;
+  private static final long DIR_EIGHT_OBJECT_ID = 31L;
+  private static final long DIR_NINE_OBJECT_ID = 32L;
+  private static final long DIR_TEN_OBJECT_ID = 33L;
+  private static final long KEY_NINETEEN_OBJECT_ID = 34L;
+  private static final long KEY_TWENTY_OBJECT_ID = 35L;
+  private static final long KEY_TWENTY_ONE_OBJECT_ID = 36L;
+  private static final long KEY_TWENTY_TWO_OBJECT_ID = 37L;
+  private static final long KEY_TWENTY_THREE_OBJECT_ID = 38L;
+  private static final long KEY_TWENTY_FOUR_OBJECT_ID = 39L;
+
+  private static final long EMPTY_OBS_BUCKET_OBJECT_ID = 40L;
+  private static final long EMPTY_FSO_BUCKET_OBJECT_ID = 41L;
+
+  private static final long KEY_ONE_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_TWO_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_THREE_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_FOUR_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_FIVE_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_SIX_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_SEVEN_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_EIGHT_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_NINE_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_TEN_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_ELEVEN_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_TWELVE_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+
+  private static final long KEY_THIRTEEN_SIZE = 2 * OzoneConsts.KB + 1; // bin 
2
+
+  private static final long KEY_FOURTEEN_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_FIFTEEN_SIZE = 2 * OzoneConsts.KB + 1; // bin 2
+  private static final long KEY_SIXTEEN_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_SEVENTEEN_SIZE = 2 * OzoneConsts.KB + 1; // 
bin 2
+  private static final long KEY_EIGHTEEN_SIZE = OzoneConsts.KB + 1; // bin 1
+  private static final long KEY_NINETEEN_SIZE = 2 * OzoneConsts.KB + 1; // bin 
2
+
+  private static final String OBS_BUCKET_PATH = "/volume1/obs-bucket";
+  private static final String FSO_BUCKET_PATH = "/volume1/fso-bucket";
+  private static final String LEGACY_BUCKET_PATH = "/volume1/legacy-bucket";
+  private static final String EMPTY_OBS_BUCKET_PATH = 
"/volume1/empty-obs-bucket";
+  private static final String EMPTY_FSO_BUCKET_PATH = 
"/volume1/empty-fso-bucket";
+  private static final String FSO_BUCKET_TWO_PATH = "/volume1/fso-bucket2";
+
+  private static final String DIR_ONE_PATH = "/volume1/fso-bucket/dir1";
+  private static final String DIR_TWO_PATH = "/volume1/fso-bucket/dir1/dir2";
+  private static final String DIR_THREE_PATH = 
"/volume1/fso-bucket/dir1/dir2/dir3";
+  private static final String NON_EXISTENT_DIR_FOUR_PATH = 
"/volume1/fso-bucket/dir1/dir2/dir3/dir4";
+
+  private static final long VOLUME_ONE_QUOTA = 2 * OzoneConsts.MB;
+  private static final long OBS_BUCKET_QUOTA = OzoneConsts.MB;
+  private static final long FSO_BUCKET_QUOTA = OzoneConsts.MB;
+
+  private ReplicationConfig ratisOne = 
ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+      HddsProtos.ReplicationFactor.ONE);
+  private long epochMillis1 =
+      ReconUtils.convertToEpochMillis("04-04-2024 12:30:00", "MM-dd-yyyy 
HH:mm:ss", TimeZone.getDefault());
+  private long epochMillis2 =
+      ReconUtils.convertToEpochMillis("04-05-2024 12:30:00", "MM-dd-yyyy 
HH:mm:ss", TimeZone.getDefault());
+
   public TestOmDBInsightEndPoint() {
     super();
   }
@@ -140,9 +313,25 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
     pipeline = getRandomPipeline();
     reconPipelineManager.addPipeline(pipeline);
     ozoneConfiguration = new OzoneConfiguration();
+    reconNamespaceSummaryManager =
+        reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
     setUpOmData();
+    nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
+        reconNamespaceSummaryManager,
+        reconOMMetadataManager, ozoneConfiguration);
+    nsSummaryTaskWithOBS  = new NSSummaryTaskWithOBS(
+        reconNamespaceSummaryManager,
+        reconOMMetadataManager, ozoneConfiguration);
+    nsSummaryTaskWithFSO  = new NSSummaryTaskWithFSO(
+        reconNamespaceSummaryManager,
+        reconOMMetadataManager, ozoneConfiguration);
+    reconNamespaceSummaryManager.clearNSSummaryTable();
+    nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
+    nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
+    nsSummaryTaskWithFSO.reprocessWithFSO(reconOMMetadataManager);
   }
 
+  @SuppressWarnings("methodlength")
   private void setUpOmData() throws Exception {
     List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
     BlockID blockID1 = new BlockID(1, 101);
@@ -215,6 +404,481 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
         new ContainerKeyMapperTask(reconContainerMetadataManager,
             ozoneConfiguration);
     containerKeyMapperTask.reprocess(reconOMMetadataManager);
+
+    String volumeOneKey = reconOMMetadataManager.getVolumeKey(VOLUME_ONE);
+    OmVolumeArgs volumeOneArgs =
+        OmVolumeArgs.newBuilder()
+            .setObjectID(VOLUME_ONE_OBJECT_ID)
+            .setVolume(VOLUME_ONE)
+            .setAdminName(TEST_USER)
+            .setOwnerName(TEST_USER)
+            .setQuotaInBytes(VOLUME_ONE_QUOTA)
+            .build();
+
+    reconOMMetadataManager.getVolumeTable().put(volumeOneKey, volumeOneArgs);
+
+    OmBucketInfo obsBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(OBS_BUCKET)
+        .setObjectID(OBS_BUCKET_OBJECT_ID)
+        .setQuotaInBytes(OBS_BUCKET_QUOTA)
+        .setBucketLayout(BucketLayout.OBJECT_STORE)
+        .build();
+    String obsBucketKey = reconOMMetadataManager.getBucketKey(
+        obsBucketInfo.getVolumeName(), obsBucketInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(obsBucketKey, obsBucketInfo);
+
+    OmBucketInfo fsoBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(FSO_BUCKET)
+        .setObjectID(FSO_BUCKET_OBJECT_ID)
+        .setQuotaInBytes(FSO_BUCKET_QUOTA)
+        .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+        .build();
+    String fsoBucketKey = reconOMMetadataManager.getBucketKey(
+        fsoBucketInfo.getVolumeName(), fsoBucketInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(fsoBucketKey, fsoBucketInfo);
+
+    OmBucketInfo emptyOBSBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(EMPTY_OBS_BUCKET)
+        .setObjectID(EMPTY_OBS_BUCKET_OBJECT_ID)
+        .setQuotaInBytes(OzoneConsts.MB)
+        .setBucketLayout(BucketLayout.OBJECT_STORE)
+        .build();
+    String emptyOBSBucketKey = reconOMMetadataManager.getBucketKey(
+        emptyOBSBucketInfo.getVolumeName(), 
emptyOBSBucketInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(emptyOBSBucketKey, 
emptyOBSBucketInfo);
+
+    OmBucketInfo emptyFSOBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(EMPTY_FSO_BUCKET)
+        .setObjectID(EMPTY_FSO_BUCKET_OBJECT_ID)
+        .setQuotaInBytes(OzoneConsts.MB)
+        .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+        .build();
+    String emptyFSOBucketKey = reconOMMetadataManager.getBucketKey(
+        emptyFSOBucketInfo.getVolumeName(), 
emptyFSOBucketInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(emptyFSOBucketKey, 
emptyFSOBucketInfo);
+
+    OmBucketInfo legacyBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(LEGACY_BUCKET)
+        .setObjectID(LEGACY_BUCKET_OBJECT_ID)
+        .setQuotaInBytes(OzoneConsts.MB)
+        .setBucketLayout(BucketLayout.LEGACY)
+        .build();
+    String legacyBucketKey = reconOMMetadataManager.getBucketKey(
+        legacyBucketInfo.getVolumeName(), legacyBucketInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(legacyBucketKey, 
legacyBucketInfo);
+
+    OmBucketInfo fsoBucketTwoInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(VOLUME_ONE)
+        .setBucketName(FSO_BUCKET_TWO)
+        .setObjectID(FSO_BUCKET_TWO_OBJECT_ID)
+        .setQuotaInBytes(OzoneConsts.MB)
+        .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+        .build();
+    String fsoBucketTwoKey = reconOMMetadataManager.getBucketKey(
+        fsoBucketTwoInfo.getVolumeName(), fsoBucketTwoInfo.getBucketName());
+
+    reconOMMetadataManager.getBucketTable().put(fsoBucketTwoKey, 
fsoBucketTwoInfo);
+
+    // Write FSO keys data - Start
+    writeDirToOm(reconOMMetadataManager, DIR_ONE_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID, FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_ONE);
+    writeDirToOm(reconOMMetadataManager, DIR_TWO_OBJECT_ID,
+        DIR_ONE_OBJECT_ID, FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_TWO);
+    writeDirToOm(reconOMMetadataManager, DIR_THREE_OBJECT_ID,
+        DIR_TWO_OBJECT_ID, FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_THREE);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_SEVEN_OBJECT_ID,
+        DIR_ONE_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_SEVEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_EIGHT_OBJECT_ID,
+        DIR_ONE_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_EIGHT_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis2, true);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_NINE_OBJECT_ID,
+        DIR_TWO_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_NINE_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_TEN_OBJECT_ID,
+        DIR_TWO_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_TEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis2, true);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_ELEVEN_OBJECT_ID,
+        DIR_THREE_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_ELEVEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_TWELVE_OBJECT_ID,
+        DIR_THREE_OBJECT_ID,
+        FSO_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_TWELVE_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis2, true);
+    // Write FSO Keys data - End
+
+    // Write FSO bucket two keys - Start
+    writeDirToOm(reconOMMetadataManager, DIR_EIGHT_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID, FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_EIGHT);
+    writeDirToOm(reconOMMetadataManager, DIR_NINE_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID, FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_NINE);
+    writeDirToOm(reconOMMetadataManager, DIR_TEN_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID, FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID, DIR_TEN);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_NINETEEN_OBJECT_ID,
+        DIR_EIGHT_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_FOURTEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_TWENTY_OBJECT_ID,
+        DIR_EIGHT_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_FIFTEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_TWENTY_ONE_OBJECT_ID,
+        DIR_NINE_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_SIXTEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_TWENTY_TWO_OBJECT_ID,
+        DIR_NINE_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_SEVENTEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+
+    writeKeyToOm(reconOMMetadataManager,
+        TEST_FILE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        TEST_FILE,
+        KEY_TWENTY_THREE_OBJECT_ID,
+        DIR_TEN_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_EIGHTEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        FILE_ONE,
+        FSO_BUCKET_TWO,
+        VOLUME_ONE,
+        FILE_ONE,
+        KEY_TWENTY_FOUR_OBJECT_ID,
+        DIR_TEN_OBJECT_ID,
+        FSO_BUCKET_TWO_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_NINETEEN_SIZE,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        ratisOne,
+        epochMillis1, true);
+    // Write FSO bucket two keys - End
+
+    // Write OBS Keys data - Start
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_ONE,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_ONE,
+        KEY_ONE_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_ONE_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_TWO,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_TWO,
+        KEY_TWO_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_TWO_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_THREE,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_THREE,
+        KEY_THREE_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_THREE_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_FOUR,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_FOUR,
+        KEY_FOUR_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_FOUR_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_FIVE,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_FIVE,
+        KEY_FIVE_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_FIVE_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_SIX,
+        OBS_BUCKET,
+        VOLUME_ONE,
+        KEY_SIX,
+        KEY_SIX_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        OBS_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_SIX_SIZE,
+        BucketLayout.OBJECT_STORE,
+        ratisOne,
+        epochMillis2, true);
+    // Write OBS Keys data - End
+
+    // Write LEGACY keys data - Start
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_FOUR + OM_KEY_PREFIX),
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        DIR_FOUR,
+        DIR_FOUR_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_FOUR + OM_KEY_PREFIX + DIR_FIVE + OM_KEY_PREFIX),
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        DIR_FIVE,
+        DIR_FIVE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_FOUR + OM_KEY_PREFIX + DIR_SIX + OM_KEY_PREFIX),
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        DIR_SIX,
+        DIR_SIX_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_FOUR + OM_KEY_PREFIX + DIR_SEVEN + OM_KEY_PREFIX),
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        DIR_FOUR,
+        DIR_FOUR_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        getBucketLayout());
+
+    // write all legacy bucket keys
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_EIGHT,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_EIGHT,
+        KEY_THIRTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_EIGHT_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_NINE,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_NINE,
+        KEY_FOURTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_NINE_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_TEN,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_TEN,
+        KEY_FIFTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_TEN_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_ELEVEN,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_ELEVEN,
+        KEY_SIXTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_ELEVEN_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis2, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_TWELVE,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_TWELVE,
+        KEY_SEVENTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_TWELVE_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis1, true);
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_THIRTEEN,
+        LEGACY_BUCKET,
+        VOLUME_ONE,
+        FILE_THIRTEEN,
+        KEY_EIGHTEEN_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        LEGACY_BUCKET_OBJECT_ID,
+        VOLUME_ONE_OBJECT_ID,
+        KEY_THIRTEEN_SIZE,
+        getBucketLayout(),
+        ratisOne,
+        epochMillis1, true);
+    // Write LEGACY keys data - End
   }
 
   @Test
@@ -785,10 +1449,460 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
     assertEquals(18L, keyInsightInfoResp.getUnreplicatedDataSize());
   }
 
+  @Test
+  public void testListKeysFSOBucket() {
+    // bucket level DU
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
FSO_BUCKET_PATH,
+        "", 1000);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(6, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/file1", keyEntityInfo.getPath());
+    assertEquals("/1/10/11/file1", keyEntityInfo.getKey());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, DIR_ONE_PATH,
+        "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(6, listKeysResponse.getKeys().size());
+
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, DIR_TWO_PATH,
+        "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(4, listKeysResponse.getKeys().size());
+
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
DIR_THREE_PATH,
+        "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+  }
+
+  @Test
+  public void testListKeysFSOBucketWithLimitAndPagination() {
+    // bucket level keyList
+    // Total 3 pages , each page 2 records. If each page we will retrieve 2 
items, as total 6 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
FSO_BUCKET_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/file1", keyEntityInfo.getPath());
+    assertEquals("/1/10/11/testfile", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        FSO_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/12/testfile", listKeysResponse.getLastKey());
+
+    // Third and last page. And last page will have empty
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        FSO_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+
+    // Try again if fourth page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        FSO_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysFSOBucketDirOnePathWithLimitTwoAndPagination() {
+    // bucket level keyList
+    // Total 3 pages , each page 2 records. If each page we will retrieve 2 
items, as total 6 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
DIR_ONE_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/file1", keyEntityInfo.getPath());
+    assertEquals("/1/10/11/testfile", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/12/testfile", listKeysResponse.getLastKey());
+
+    // Third and last page. And last page will have empty
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+
+    // Try again if fourth page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysFSOBucketDirOnePathWithLimitOneAndPagination() {
+    // bucket level keyList
+    // Total 3 pages , each page 2 records. If each page we will retrieve 2 
items, as total 6 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
DIR_ONE_PATH,
+        "", 1);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(1, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/file1", keyEntityInfo.getPath());
+    assertEquals("/1/10/11/file1", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(1);
+    assertEquals("volume1/fso-bucket/dir1/dir2/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/12/file1", listKeysResponse.getLastKey());
+
+    // Third page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(1);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/file1", listKeysResponse.getLastKey());
+
+    // Fourth page will have just one key
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(1, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/testfile", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+
+    // Try again if fifth page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_ONE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysFSOBucketTwoPathWithLimitAcrossDirsAtBucketLevel() {
+    // bucket level keyList
+    // Total 3 pages , each page 2 records. If each page we will retrieve 2 
items, as total 6 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
FSO_BUCKET_TWO_PATH,
+        "", 3);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(3, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket2/dir8/file1", keyEntityInfo.getPath());
+    assertEquals("/1/30/32/file1", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        FSO_BUCKET_TWO_PATH, listKeysResponse.getLastKey(), 3);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(3, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket2/dir9/testfile", keyEntityInfo.getPath());
+    assertEquals("/1/30/33/testfile", listKeysResponse.getLastKey());
+
+    // Try again if third page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        FSO_BUCKET_TWO_PATH, listKeysResponse.getLastKey(), 3);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysFSOBucketDirTwoPathWithLimitAndPagination() {
+    // bucket level keyList
+    // Total 2 pages , each page 2 records. If each page we will retrieve 2 
items, as total 4 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
DIR_TWO_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/12/testfile", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_TWO_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+
+    // Try again if third page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_TWO_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysFSOBucketDirThreePathWithLimitAndPagination() {
+    // bucket level keyList
+    // Just 1 page , page will have just 2 records. Total 2 keys
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
DIR_THREE_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/fso-bucket/dir1/dir2/dir3/file1", 
keyEntityInfo.getPath());
+    assertEquals("/1/10/13/testfile", listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Try again if second page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        DIR_THREE_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysOBSBucket() throws Exception {
+    // List keys under obs-bucket based on RATIS ReplicationConfig
+    // creationDate filter and keySize filter both are empty, so only RATIS 
replication type filter
+    // will be applied to return all RATIS keys under obs-bucket.
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
OBS_BUCKET_PATH,
+        "", 1000);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    // There are total 6 RATIS keys under this OBS bucket.
+    assertEquals(6, listKeysResponse.getKeys().size());
+    assertEquals("volume1/obs-bucket/key1", 
listKeysResponse.getKeys().get(0).getPath());
+    assertEquals("/volume1/obs-bucket/key1", 
listKeysResponse.getKeys().get(0).getKey());
+    assertEquals(OBS_BUCKET_PATH + OM_KEY_PREFIX + KEY_SIX, 
listKeysResponse.getLastKey());
+
+    // Filter listKeys based on key creation date
+    // creationDate filter passed 1 minute above of KEY1 creation date, so 
listKeys API will return
+    // only 5 keys excluding Key1, as 5 RATIS keys got created after 
creationDate filter value.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "04-04-2024 
12:31:00", 0,
+        OBS_BUCKET_PATH, "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    // There are total 5 RATIS keys under this OBS bucket which were created 
after "04-04-2024 12:31:00".
+    assertEquals(5, listKeysResponse.getKeys().size());
+    assertEquals(OBS_BUCKET_PATH + OM_KEY_PREFIX + KEY_SIX, 
listKeysResponse.getLastKey());
+
+    // creationDate filter passed same as KEY6 creation date, so listKeys API 
will return all 6 keys
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "04-04-2024 
12:30:00", 0,
+        OBS_BUCKET_PATH, "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    // There are total 6 RATIS keys under this OBS bucket which were created 
on or after "04-04-2024 12:30:00".
+    assertEquals(6, listKeysResponse.getKeys().size());
+    assertEquals(OBS_BUCKET_PATH + OM_KEY_PREFIX + KEY_SIX, 
listKeysResponse.getLastKey());
+
+    // creationDate filter passed as "04-04-2024 12:30:00", but 
replicationType filter is EC,
+    // so listKeys API will return zero keys, because no EC key got created at 
or after creationDate filter value.
+    bucketResponse = omdbInsightEndpoint.listKeys("EC", "04-04-2024 12:30:00", 
0,
+        OBS_BUCKET_PATH, "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    // There are ZERO EC keys created under this OBS bucket which were created 
on or after "04-04-2024 12:30:00".
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+
+    // creationDate filter passed as "04-05-2024 12:30:00", and 
replicationType filter is RATIS,
+    // so listKeys API will return 5 keys, as only 5 RATIS key got created at 
or after creationDate filter value.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "04-05-2024 
12:30:00", 0,
+        OBS_BUCKET_PATH, "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(5, listKeysResponse.getKeys().size());
+    assertEquals(OBS_BUCKET_PATH + OM_KEY_PREFIX + KEY_SIX, 
listKeysResponse.getLastKey());
+
+    // creationDate filter passed as "04-05-2024 12:30:00", and 
replicationType filter is RATIS,
+    // so listKeys API should return 5 keys, as only 1 RATIS key got created 
on or after creationDate filter value.
+    // but since keySize filter value is 1026 bytes and 3 RATIS keys created 
are of size 2025 bytes, and
+    // other 3 keys created are of size 1025, so 3 keys will be filtered out 
of 5 keys, as 1 key will be filtered
+    // out due to creationDate filter.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "04-05-2024 
12:30:00", 1026,
+        OBS_BUCKET_PATH, "", 1000);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(3, listKeysResponse.getKeys().size());
+    assertEquals(OBS_BUCKET_PATH + OM_KEY_PREFIX + KEY_SIX, 
listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysOBSBucketWithLimitAndPagination() throws Exception {
+    // As per design, client should not change filter values between pages.
+    // Below test will fetch multiple pages for same filters in query.
+
+    // Total 3 pages , each page 2 records. If each page we will retrieve 2 
items, as total 6 FSO keys,
+    // so till we get empty last key, we'll continue to fetch and empty last 
key signifies the last page.
+    // First Page
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
OBS_BUCKET_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    KeyEntityInfo keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/obs-bucket/key1", keyEntityInfo.getPath());
+    assertEquals("/volume1/obs-bucket/key1/key2", 
listKeysResponse.getLastKey());
+    assertEquals("RATIS", 
keyEntityInfo.getReplicationConfig().getReplicationType().toString());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        OBS_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/obs-bucket/key1/key2/key3", keyEntityInfo.getPath());
+    assertEquals("/volume1/obs-bucket/key4", listKeysResponse.getLastKey());
+
+    // Third and last page. And last page will have empty
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        OBS_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    keyEntityInfo = listKeysResponse.getKeys().get(0);
+    assertEquals("volume1/obs-bucket/key5", keyEntityInfo.getPath());
+    assertEquals("/volume1/obs-bucket/key6", listKeysResponse.getLastKey());
+
+    // Try again if fourth page is available. Ideally there should not be any 
further records
+    // and lastKey should be empty as per design.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0,
+        OBS_BUCKET_PATH, listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysForEmptyOBSBucket() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
EMPTY_OBS_BUCKET_PATH,
+        "", 1000);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysForEmptyFSOBucket() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
EMPTY_FSO_BUCKET_PATH,
+        "", 1000);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysForNonExistentOBSPaths() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
OBS_BUCKET_PATH +
+            OM_KEY_PREFIX + NON_EXISTENT_KEY_SEVEN,
+        "", 2);
+    String entityResp = (String) bucketResponse.getEntity();
+    assertEquals("{\"message\": \"Unexpected runtime error while searching 
keys in OM DB: Not valid path: " +
+        "java.lang.UnsupportedOperationException: Object stores do not support 
directories.\"}", entityResp);
+  }
+
+  @Test
+  public void testListKeysForNonExistentFSOPaths() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
NON_EXISTENT_DIR_FOUR_PATH,
+        "", 2);
+    String entityResp = (String) bucketResponse.getEntity();
+    assertEquals("{\"message\": \"Unexpected runtime error while searching 
keys in OM DB: Not valid path: " +
+        "java.lang.IllegalArgumentException: Not valid path\"}", entityResp);
+  }
+
+  @Test
+  public void testListKeysForNullOrEmptyStartPrefixPath() {
+    Response nullStartPrefixResp = omdbInsightEndpoint.listKeys("RATIS", "", 
0, null,
+        "", 2);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
nullStartPrefixResp.getStatus());
+
+    Response emptyStartPrefixResp = omdbInsightEndpoint.listKeys("RATIS", "", 
0, "",
+        "", 2);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
emptyStartPrefixResp.getStatus());
+
+    Response invaliStartPrefixResp = omdbInsightEndpoint.listKeys("RATIS", "", 
0, "null",
+        "", 2);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
invaliStartPrefixResp.getStatus());
+  }
+
+  @Test
+  public void testListKeysLegacyBucketWithFSEnabled() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
LEGACY_BUCKET_PATH,
+        "", 1000);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(6, listKeysResponse.getKeys().size());
+    assertEquals("/volume1/legacy-bucket/key8", listKeysResponse.getLastKey());
+  }
+
+  @Test
+  public void testListKeysLegacyBucketWithFSEnabledAndPagination() {
+    Response bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
LEGACY_BUCKET_PATH,
+        "", 2);
+    ListKeysResponse listKeysResponse = (ListKeysResponse) 
bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    assertEquals("/volume1/legacy-bucket/dir4/dir6/key10", 
listKeysResponse.getLastKey());
+
+    // Second page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
LEGACY_BUCKET_PATH,
+        listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    assertEquals("/volume1/legacy-bucket/key11", 
listKeysResponse.getLastKey());
+
+    // Third page
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
LEGACY_BUCKET_PATH,
+        listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(2, listKeysResponse.getKeys().size());
+    assertEquals("/volume1/legacy-bucket/key8", listKeysResponse.getLastKey());
+
+    // Fourth page should not have any keys left as we have iterated
+    // all 6 keys in 3 pages with each page returns 2 keys.
+    bucketResponse = omdbInsightEndpoint.listKeys("RATIS", "", 0, 
LEGACY_BUCKET_PATH,
+        listKeysResponse.getLastKey(), 2);
+    listKeysResponse = (ListKeysResponse) bucketResponse.getEntity();
+    assertEquals(0, listKeysResponse.getKeys().size());
+    assertEquals("", listKeysResponse.getLastKey());
+  }
+
   private NSSummary getNsSummary(long size) {
     NSSummary summary = new NSSummary();
     summary.setSizeOfFiles(size);
     return summary;
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to