This is an automated email from the ASF dual-hosted git repository.

devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b7aaec9c HDDS-11205. Implement a search feature for users to locate 
keys pending Deletion within the OM Deleted Keys Insights section (#6969)
86b7aaec9c is described below

commit 86b7aaec9cf146d917171378e31c320a985aa707
Author: Arafat2198 <[email protected]>
AuthorDate: Fri Oct 18 19:26:40 2024 +0530

    HDDS-11205. Implement a search feature for users to locate keys pending 
Deletion within the OM Deleted Keys Insights section (#6969)
---
 .../apache/hadoop/ozone/recon/ReconConstants.java  |  10 +-
 .../hadoop/ozone/recon/ReconResponseUtils.java     |   2 +-
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  | 114 ++++-
 .../ozone/recon/api/OMDBInsightEndpoint.java       | 184 +++----
 .../ozone/recon/api/OMDBInsightSearchEndpoint.java |  96 +---
 .../recon/api/TestDeletedKeysSearchEndpoint.java   | 549 +++++++++++++++++++++
 .../recon/api/TestOMDBInsightSearchEndpoint.java   |  24 +-
 .../ozone/recon/api/TestOmDBInsightEndPoint.java   | 129 ++++-
 8 files changed, 928 insertions(+), 180 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index ed657931e0..5768166c95 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -43,20 +43,20 @@ public final class ReconConstants {
   public static final int DISK_USAGE_TOP_RECORDS_LIMIT = 30;
   public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
   public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
-  public static final String DEFAULT_START_PREFIX = "/";
   public static final String DEFAULT_FETCH_COUNT = "1000";
   public static final String DEFAULT_KEY_SIZE = "0";
   public static final String DEFAULT_BATCH_NUMBER = "1";
   public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
   public static final String RECON_QUERY_PREVKEY = "prevKey";
+  public static final String RECON_QUERY_START_PREFIX = "startPrefix";
   public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
   public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
-  public static final String RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT = "1000";
-  public static final String RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY = "";
+  public static final String RECON_OM_INSIGHTS_DEFAULT_START_PREFIX = "/";
+  public static final String RECON_OM_INSIGHTS_DEFAULT_SEARCH_LIMIT = "1000";
+  public static final String RECON_OM_INSIGHTS_DEFAULT_SEARCH_PREV_KEY = "";
   public static final String RECON_QUERY_FILTER = "missingIn";
   public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
-  public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE =
-      "0";
+  public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE 
= "0";
   // Only include containers that are missing in OM by default
   public static final String DEFAULT_FILTER_FOR_MISSING_CONTAINERS = "SCM";
   public static final String RECON_QUERY_LIMIT = "limit";
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
index 41235ae542..dc53f195f6 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconResponseUtils.java
@@ -44,7 +44,7 @@ public final class ReconResponseUtils {
     String jsonResponse = String.format(
         "{\"message\": \"No keys matched the search prefix: '%s'.\"}",
         startPrefix);
-    return Response.status(Response.Status.NOT_FOUND)
+    return Response.status(Response.Status.NO_CONTENT)
         .entity(jsonResponse)
         .type(MediaType.APPLICATION_JSON)
         .build();
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 1a2a705fc0..f65e2f30cb 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -32,11 +32,14 @@ import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
-import java.util.List;
-import java.util.TimeZone;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.ArrayList;
+import java.util.TimeZone;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -54,6 +57,8 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 
@@ -596,6 +601,109 @@ public class ReconUtils {
     }
   }
 
+  public static boolean validateStartPrefix(String startPrefix) {
+
+    // Ensure startPrefix starts with '/' for non-empty values
+    startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + 
startPrefix;
+
+    // Split the path to ensure it's at least at the bucket level 
(volume/bucket).
+    String[] pathComponents = startPrefix.split("/");
+    if (pathComponents.length < 3 || pathComponents[2].isEmpty()) {
+      return false; // Invalid if not at bucket level or deeper
+    }
+
+    return true;
+  }
+
+  /**
+   * Retrieves keys from the specified table based on pagination and prefix 
filtering.
+   * This method handles different scenarios based on the presence of {@code 
startPrefix}
+   * and {@code prevKey}, enabling efficient key retrieval from the table.
+   *
+   * The method handles the following cases:
+   *
+   * 1. {@code prevKey} provided, {@code startPrefix} empty:
+   *    - Seeks to {@code prevKey}, skips it, and returns subsequent records 
up to the limit.
+   *
+   * 2. {@code prevKey} empty, {@code startPrefix} empty:
+   *    - Iterates from the beginning of the table, retrieving all records up 
to the limit.
+   *
+   * 3. {@code startPrefix} provided, {@code prevKey} empty:
+   *    - Seeks to the first key matching {@code startPrefix} and returns all 
matching keys up to the limit.
+   *
+   * 4. {@code startPrefix} provided, {@code prevKey} provided:
+   *    - Seeks to {@code prevKey}, skips it, and returns subsequent keys that 
match {@code startPrefix},
+   *      up to the limit.
+   *
+   * This method also handles the following {@code limit} scenarios:
+   * - If {@code limit == 0} or {@code limit < -1}, no records are returned.
+   * - If {@code limit == -1}, all records are returned.
+   * - For positive {@code limit}, it retrieves records up to the specified 
{@code limit}.
+   *
+   * @param table       The table to retrieve keys from.
+   * @param startPrefix The search prefix to match keys against.
+   * @param limit       The maximum number of keys to retrieve.
+   * @param prevKey     The key to start after for the next set of records.
+   * @return A map of keys and their corresponding {@code OmKeyInfo} or {@code 
RepeatedOmKeyInfo} objects.
+   * @throws IOException If there are problems accessing the table.
+   */
+  public static <T> Map<String, T> extractKeysFromTable(
+      Table<String, T> table, String startPrefix, int limit, String prevKey)
+      throws IOException {
+
+    Map<String, T> matchedKeys = new LinkedHashMap<>();
+
+    // Null check for the table to prevent NPE during omMetaManager 
initialization
+    if (table == null) {
+      log.error("Table object is null. omMetaManager might still be 
initializing.");
+      return Collections.emptyMap();
+    }
+
+    // If limit = 0, return an empty result set
+    if (limit == 0 || limit < -1) {
+      return matchedKeys;
+    }
+
+    // If limit = -1, set it to Integer.MAX_VALUE to return all records
+    int actualLimit = (limit == -1) ? Integer.MAX_VALUE : limit;
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, T>> keyIter = 
table.iterator()) {
+
+      // Scenario 1 & 4: prevKey is provided (whether startPrefix is empty or 
not)
+      if (!prevKey.isEmpty()) {
+        keyIter.seek(prevKey);
+        if (keyIter.hasNext()) {
+          keyIter.next();  // Skip the previous key record
+        }
+      } else if (!startPrefix.isEmpty()) {
+        // Scenario 3: startPrefix is provided but prevKey is empty, so seek 
to startPrefix
+        keyIter.seek(startPrefix);
+      }
+
+      // Scenario 2: Both startPrefix and prevKey are empty (iterate from the 
start of the table)
+      // No seeking needed; just start iterating from the first record in the 
table
+      // This is implicit in the following loop, as the iterator will start 
from the beginning
+
+      // Iterate through the keys while adhering to the limit (if the limit is 
not zero)
+      while (keyIter.hasNext() && matchedKeys.size() < actualLimit) {
+        Table.KeyValue<String, T> entry = keyIter.next();
+        String dbKey = entry.getKey();
+
+        // Scenario 3 & 4: If startPrefix is provided, ensure the key matches 
startPrefix
+        if (!startPrefix.isEmpty() && !dbKey.startsWith(startPrefix)) {
+          break;  // If the key no longer matches the prefix, exit the loop
+        }
+
+        // Add the valid key-value pair to the results
+        matchedKeys.put(dbKey, entry.getValue());
+      }
+    } catch (IOException exception) {
+      log.error("Error retrieving keys from table for path: {}", startPrefix, 
exception);
+      throw exception;
+    }
+    return matchedKeys;
+  }
+
   /**
    * Finds all subdirectories under a parent directory in an FSO bucket. It 
builds
    * a list of paths for these subdirectories. These sub-directories are then 
used
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index 5e278a21f3..d28275e547 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -66,19 +66,26 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
 import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
-import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_KEY_SIZE;
-import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
-import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_NON_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_FSO;
 import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_NON_FSO;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_START_PREFIX;
+import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_KEY_SIZE;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse;
+import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse;
+import static org.apache.hadoop.ozone.recon.ReconUtils.extractKeysFromTable;
+import static org.apache.hadoop.ozone.recon.ReconUtils.validateStartPrefix;
 import static 
org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler;
 import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath;
 import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath;
@@ -211,7 +218,7 @@ public class OMDBInsightEndpoint {
               keyIter = openKeyTable.iterator()) {
         boolean skipPrevKey = false;
         String seekKey = prevKey;
-        if (!skipPrevKeyDone && StringUtils.isNotBlank(prevKey)) {
+        if (!skipPrevKeyDone && isNotBlank(prevKey)) {
           skipPrevKey = true;
           Table.KeyValue<String, OmKeyInfo> seekKeyValue =
               keyIter.seek(seekKey);
@@ -219,7 +226,7 @@ public class OMDBInsightEndpoint {
           // if not, then return empty result
           // In case of an empty prevKeyPrefix, all the keys are returned
           if (seekKeyValue == null ||
-              (StringUtils.isNotBlank(prevKey) &&
+              (isNotBlank(prevKey) &&
                   !seekKeyValue.getKey().equals(prevKey))) {
             continue;
           }
@@ -340,62 +347,6 @@ public class OMDBInsightEndpoint {
     return record != null ? record.getValue() : 0L;
   }
 
-  private void getPendingForDeletionKeyInfo(
-      int limit,
-      String prevKey,
-      KeyInsightInfoResponse deletedKeyAndDirInsightInfo) {
-    List<RepeatedOmKeyInfo> repeatedOmKeyInfoList =
-        deletedKeyAndDirInsightInfo.getRepeatedOmKeyInfoList();
-    Table<String, RepeatedOmKeyInfo> deletedTable =
-        omMetadataManager.getDeletedTable();
-    try (
-        TableIterator<String, ? extends Table.KeyValue<String,
-            RepeatedOmKeyInfo>>
-            keyIter = deletedTable.iterator()) {
-      boolean skipPrevKey = false;
-      String seekKey = prevKey;
-      String lastKey = "";
-      if (StringUtils.isNotBlank(prevKey)) {
-        skipPrevKey = true;
-        Table.KeyValue<String, RepeatedOmKeyInfo> seekKeyValue =
-            keyIter.seek(seekKey);
-        // check if RocksDB was able to seek correctly to the given key prefix
-        // if not, then return empty result
-        // In case of an empty prevKeyPrefix, all the keys are returned
-        if (seekKeyValue == null ||
-            (StringUtils.isNotBlank(prevKey) &&
-                !seekKeyValue.getKey().equals(prevKey))) {
-          return;
-        }
-      }
-      while (keyIter.hasNext()) {
-        Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
-        String key = kv.getKey();
-        lastKey = key;
-        RepeatedOmKeyInfo repeatedOmKeyInfo = kv.getValue();
-        // skip the prev key if prev key is present
-        if (skipPrevKey && key.equals(prevKey)) {
-          continue;
-        }
-        updateReplicatedAndUnReplicatedTotal(deletedKeyAndDirInsightInfo,
-            repeatedOmKeyInfo);
-        repeatedOmKeyInfoList.add(repeatedOmKeyInfo);
-        if ((repeatedOmKeyInfoList.size()) == limit) {
-          break;
-        }
-      }
-      deletedKeyAndDirInsightInfo.setLastKey(lastKey);
-    } catch (IOException ex) {
-      throw new WebApplicationException(ex,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    } catch (IllegalArgumentException e) {
-      throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
-    } catch (Exception ex) {
-      throw new WebApplicationException(ex,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
   /** Retrieves the summary of deleted keys.
    *
    * This method calculates and returns a summary of deleted keys.
@@ -429,6 +380,7 @@ public class OMDBInsightEndpoint {
    * limit - limits the number of key/files returned.
    * prevKey - E.g. /vol1/bucket1/key1, this will skip keys till it
    * seeks correctly to the given prevKey.
+   * startPrefix - E.g. /vol1/bucket1, this will return keys matching this 
prefix.
    * Sample API Response:
    * {
    *   "lastKey": "vol1/bucket1/key1",
@@ -477,17 +429,90 @@ public class OMDBInsightEndpoint {
   @GET
   @Path("/deletePending")
   public Response getDeletedKeyInfo(
-      @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
-      int limit,
-      @DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY)
-      String prevKey) {
-    KeyInsightInfoResponse
-        deletedKeyInsightInfo = new KeyInsightInfoResponse();
-    getPendingForDeletionKeyInfo(limit, prevKey,
-        deletedKeyInsightInfo);
+      @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int 
limit,
+      @DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY) String 
prevKey,
+      @DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_START_PREFIX) 
String startPrefix) {
+
+    // Initialize the response object to hold the key information
+    KeyInsightInfoResponse deletedKeyInsightInfo = new 
KeyInsightInfoResponse();
+
+    boolean keysFound = false;
+
+    try {
+      // Validate startPrefix if it's provided
+      if (isNotBlank(startPrefix) && !validateStartPrefix(startPrefix)) {
+        return createBadRequestResponse("Invalid startPrefix: Path must be at 
the bucket level or deeper.");
+      }
+
+      // Perform the search based on the limit, prevKey, and startPrefix
+      keysFound = getPendingForDeletionKeyInfo(limit, prevKey, startPrefix, 
deletedKeyInsightInfo);
+
+    } catch (IllegalArgumentException e) {
+      LOG.error("Invalid startPrefix provided: {}", startPrefix, e);
+      return createBadRequestResponse("Invalid startPrefix: " + 
e.getMessage());
+    } catch (IOException e) {
+      LOG.error("I/O error while searching deleted keys in OM DB", e);
+      return createInternalServerErrorResponse("Error searching deleted keys 
in OM DB: " + e.getMessage());
+    } catch (Exception e) {
+      LOG.error("Unexpected error occurred while searching deleted keys", e);
+      return createInternalServerErrorResponse("Unexpected error: " + 
e.getMessage());
+    }
+
+    if (!keysFound) {
+      return noMatchedKeysResponse("");
+    }
+
     return Response.ok(deletedKeyInsightInfo).build();
   }
 
+  /**
+   * Retrieves keys pending deletion based on startPrefix, filtering keys 
matching the prefix.
+   *
+   * @param limit                 The limit of records to return.
+   * @param prevKey               Pagination key.
+   * @param startPrefix           The search prefix.
+   * @param deletedKeyInsightInfo The response object to populate.
+   */
+  private boolean getPendingForDeletionKeyInfo(
+      int limit, String prevKey, String startPrefix,
+      KeyInsightInfoResponse deletedKeyInsightInfo) throws IOException {
+
+    long replicatedTotal = 0;
+    long unreplicatedTotal = 0;
+    boolean keysFound = false;
+    String lastKey = null;
+
+    // Search for deleted keys in DeletedTable
+    Table<String, RepeatedOmKeyInfo> deletedTable = 
omMetadataManager.getDeletedTable();
+    Map<String, RepeatedOmKeyInfo> deletedKeys =
+        extractKeysFromTable(deletedTable, startPrefix, limit, prevKey);
+
+    // Iterate over the retrieved keys and populate the response
+    for (Map.Entry<String, RepeatedOmKeyInfo> entry : deletedKeys.entrySet()) {
+      keysFound = true;
+      RepeatedOmKeyInfo repeatedOmKeyInfo = entry.getValue();
+
+      // We know each RepeatedOmKeyInfo has just one OmKeyInfo object
+      OmKeyInfo keyInfo = repeatedOmKeyInfo.getOmKeyInfoList().get(0);
+      KeyEntityInfo keyEntityInfo = 
createKeyEntityInfoFromOmKeyInfo(entry.getKey(), keyInfo);
+
+      // Add the key directly to the list without classification
+      deletedKeyInsightInfo.getRepeatedOmKeyInfoList().add(repeatedOmKeyInfo);
+
+      replicatedTotal += keyInfo.getReplicatedSize();
+      unreplicatedTotal += keyInfo.getDataSize();
+
+      lastKey = entry.getKey(); // Update lastKey
+    }
+
+    // Set the aggregated totals in the response
+    deletedKeyInsightInfo.setReplicatedDataSize(replicatedTotal);
+    deletedKeyInsightInfo.setUnreplicatedDataSize(unreplicatedTotal);
+    deletedKeyInsightInfo.setLastKey(lastKey);
+
+    return keysFound;
+  }
+
   /**
    * Creates a keys summary for deleted keys and updates the provided
    * keysSummary map. Calculates the total number of deleted keys, replicated
@@ -527,7 +552,7 @@ public class OMDBInsightEndpoint {
       boolean skipPrevKey = false;
       String seekKey = prevKey;
       String lastKey = "";
-      if (StringUtils.isNotBlank(prevKey)) {
+      if (isNotBlank(prevKey)) {
         skipPrevKey = true;
         Table.KeyValue<String, OmKeyInfo> seekKeyValue =
             keyIter.seek(seekKey);
@@ -535,7 +560,7 @@ public class OMDBInsightEndpoint {
         // if not, then return empty result
         // In case of an empty prevKeyPrefix, all the keys are returned
         if (seekKeyValue == null ||
-            (StringUtils.isNotBlank(prevKey) &&
+            (isNotBlank(prevKey) &&
                 !seekKeyValue.getKey().equals(prevKey))) {
           return;
         }
@@ -961,7 +986,7 @@ public class OMDBInsightEndpoint {
         limit, false, "");
     Response response = getListKeysResponse(paramInfo);
     if ((response.getStatus() != Response.Status.OK.getStatusCode()) &&
-        (response.getStatus() != Response.Status.NOT_FOUND.getStatusCode())) {
+        (response.getStatus() != Response.Status.NO_CONTENT.getStatusCode())) {
       return response;
     }
     if (response.getEntity() instanceof ListKeysResponse) {
@@ -1162,7 +1187,7 @@ public class OMDBInsightEndpoint {
     try (
         TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = table.iterator()) {
 
-      if (!paramInfo.isSkipPrevKeyDone() && StringUtils.isNotBlank(seekKey)) {
+      if (!paramInfo.isSkipPrevKeyDone() && isNotBlank(seekKey)) {
         skipPrevKey = true;
         Table.KeyValue<String, OmKeyInfo> seekKeyValue =
             keyIter.seek(seekKey);
@@ -1279,19 +1304,6 @@ public class OMDBInsightEndpoint {
     dirSummary.put("totalDeletedDirectories", deletedDirCount);
   }
 
-  private void updateReplicatedAndUnReplicatedTotal(
-      KeyInsightInfoResponse deletedKeyAndDirInsightInfo,
-      RepeatedOmKeyInfo repeatedOmKeyInfo) {
-    repeatedOmKeyInfo.getOmKeyInfoList().forEach(omKeyInfo -> {
-      deletedKeyAndDirInsightInfo.setUnreplicatedDataSize(
-          deletedKeyAndDirInsightInfo.getUnreplicatedDataSize() +
-              omKeyInfo.getDataSize());
-      deletedKeyAndDirInsightInfo.setReplicatedDataSize(
-          deletedKeyAndDirInsightInfo.getReplicatedDataSize() +
-              omKeyInfo.getReplicatedSize());
-    });
-  }
-
   private String createPath(OmKeyInfo omKeyInfo) {
     return omKeyInfo.getVolumeName() + OM_KEY_PREFIX +
         omKeyInfo.getBucketName() + OM_KEY_PREFIX + omKeyInfo.getKeyName();
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
index 16b5f20b12..fcd73fbe72 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java
@@ -20,12 +20,10 @@ package org.apache.hadoop.ozone.recon.api;
 
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler;
 import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo;
 import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
@@ -50,13 +48,16 @@ import java.util.List;
 import java.util.ArrayList;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static 
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX;
-import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT;
-import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_INSIGHTS_DEFAULT_START_PREFIX;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_INSIGHTS_DEFAULT_SEARCH_LIMIT;
+import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_INSIGHTS_DEFAULT_SEARCH_PREV_KEY;
 import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse;
 import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse;
 import static 
org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse;
+import static org.apache.hadoop.ozone.recon.ReconUtils.validateStartPrefix;
 import static 
org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix;
+import static org.apache.hadoop.ozone.recon.ReconUtils.extractKeysFromTable;
+import static org.apache.hadoop.ozone.recon.ReconUtils.gatherSubPaths;
 import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames;
 import static 
org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler;
 import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath;
@@ -64,6 +65,11 @@ import static 
org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequ
 
 /**
  * REST endpoint for search implementation in OM DB Insight.
+ *
+ * This class provides endpoints for searching keys in the Ozone Manager 
database.
+ * It supports searching for both open and deleted keys across File System 
Optimized (FSO)
+ * and Object Store (non-FSO) bucket layouts. The results include matching 
keys and their
+ * data sizes.
  */
 @Path("/keys")
 @Produces(MediaType.APPLICATION_JSON)
@@ -88,14 +94,14 @@ public class OMDBInsightSearchEndpoint {
 
 
   /**
-   * Performs a search for open keys in the Ozone Manager (OM) database using 
a specified search prefix.
+   * Performs a search for open keys in the Ozone Manager OpenKey and OpenFile 
table using a specified search prefix.
    * This endpoint searches across both File System Optimized (FSO) and Object 
Store (non-FSO) layouts,
    * compiling a list of keys that match the given prefix along with their 
data sizes.
-   * <p>
+   *
    * The search prefix must start from the bucket level 
('/volumeName/bucketName/') or any specific directory
    * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' 
inside 'bucketA' of 'volA').
    * The search operation matches the prefix against the start of keys' names 
within the OM DB.
-   * <p>
+   *
    * Example Usage:
    * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 
'bucketA' in volume 'volA'.
    * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' 
inside 'bucketA' of 'volA'.
@@ -110,25 +116,17 @@ public class OMDBInsightSearchEndpoint {
   @GET
   @Path("/open/search")
   public Response searchOpenKeys(
-      @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix")
+      @DefaultValue(RECON_OM_INSIGHTS_DEFAULT_START_PREFIX) 
@QueryParam("startPrefix")
       String startPrefix,
-      @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit")
+      @DefaultValue(RECON_OM_INSIGHTS_DEFAULT_SEARCH_LIMIT) 
@QueryParam("limit")
       int limit,
-      @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) 
@QueryParam("prevKey") String prevKey) throws IOException {
+      @DefaultValue(RECON_OM_INSIGHTS_DEFAULT_SEARCH_PREV_KEY) 
@QueryParam("prevKey")
+      String prevKey) throws IOException {
 
     try {
-      // Ensure startPrefix is not null or empty and starts with '/'
-      if (startPrefix == null || startPrefix.length() == 0) {
-        return createBadRequestResponse(
-            "Invalid startPrefix: Path must be at the bucket level or 
deeper.");
-      }
-      startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + 
startPrefix;
-
-      // Split the path to ensure it's at least at the bucket level
-      String[] pathComponents = startPrefix.split("/");
-      if (pathComponents.length < 3 || pathComponents[2].isEmpty()) {
-        return createBadRequestResponse(
-            "Invalid startPrefix: Path must be at the bucket level or 
deeper.");
+      // Validate the request parameters
+      if (!validateStartPrefix(startPrefix)) {
+        return createBadRequestResponse("Invalid startPrefix: Path must be at 
the bucket level or deeper.");
       }
 
       // Ensure the limit is non-negative
@@ -145,7 +143,7 @@ public class OMDBInsightSearchEndpoint {
       Table<String, OmKeyInfo> openKeyTable =
           omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY);
       Map<String, OmKeyInfo> obsKeys =
-          retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey);
+          extractKeysFromTable(openKeyTable, startPrefix, limit, prevKey);
       for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) {
         keysFound = true;
         KeyEntityInfo keyEntityInfo =
@@ -221,12 +219,13 @@ public class OMDBInsightSearchEndpoint {
       subPaths.add(startPrefixObjectPath);
 
       // Recursively gather all subpaths
-      ReconUtils.gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), 
Long.parseLong(names[1]),
+      gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), 
Long.parseLong(names[1]),
           reconNamespaceSummaryManager);
 
       // Iterate over the subpaths and retrieve the open files
       for (String subPath : subPaths) {
-        matchedKeys.putAll(retrieveKeysFromTable(openFileTable, subPath, limit 
- matchedKeys.size(), prevKey));
+        matchedKeys.putAll(
+            extractKeysFromTable(openFileTable, subPath, limit - 
matchedKeys.size(), prevKey));
         if (matchedKeys.size() >= limit) {
           break;
         }
@@ -235,7 +234,8 @@ public class OMDBInsightSearchEndpoint {
     }
 
     // If the search level is at the volume, bucket or key level, directly 
search the openFileTable
-    matchedKeys.putAll(retrieveKeysFromTable(openFileTable, 
startPrefixObjectPath, limit, prevKey));
+    matchedKeys.putAll(
+        extractKeysFromTable(openFileTable, startPrefixObjectPath, limit, 
prevKey));
     return matchedKeys;
   }
 
@@ -327,48 +327,6 @@ public class OMDBInsightSearchEndpoint {
     return prevKeyPrefix;
   }
 
-
-  /**
-   * Common method to retrieve keys from a table based on a search prefix and 
a limit.
-   *
-   * @param table       The table to retrieve keys from.
-   * @param startPrefix The search prefix to match keys against.
-   * @param limit       The maximum number of keys to retrieve.
-   * @param prevKey     The key to start after for the next set of records.
-   * @return A map of keys and their corresponding OmKeyInfo objects.
-   * @throws IOException If there are problems accessing the table.
-   */
-  private Map<String, OmKeyInfo> retrieveKeysFromTable(
-      Table<String, OmKeyInfo> table, String startPrefix, int limit, String 
prevKey)
-      throws IOException {
-    Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter = table.iterator()) {
-      // If a previous key is provided, seek to the previous key and skip it.
-      if (!prevKey.isEmpty()) {
-        keyIter.seek(prevKey);
-        if (keyIter.hasNext()) {
-          // Skip the previous key
-          keyIter.next();
-        }
-      } else {
-        // If no previous key is provided, start from the search prefix.
-        keyIter.seek(startPrefix);
-      }
-      while (keyIter.hasNext() && matchedKeys.size() < limit) {
-        Table.KeyValue<String, OmKeyInfo> entry = keyIter.next();
-        String dbKey = entry.getKey();
-        if (!dbKey.startsWith(startPrefix)) {
-          break; // Exit the loop if the key no longer matches the prefix
-        }
-        matchedKeys.put(dbKey, entry.getValue());
-      }
-    } catch (IOException exception) {
-      LOG.error("Error retrieving keys from table for path: {}", startPrefix, 
exception);
-      throw exception;
-    }
-    return matchedKeys;
-  }
-
   /**
    * Creates a KeyEntityInfo object from an OmKeyInfo object and the 
corresponding key.
    *
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestDeletedKeysSearchEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestDeletedKeysSearchEndpoint.java
new file mode 100644
index 0000000000..5f3d0fa126
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestDeletedKeysSearchEndpoint.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.recon.ReconTestInjector;
+import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import 
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test class for DeletedKeysSearchEndpoint.
+ *
+ * This class tests various scenarios for searching deleted keys within a
+ * given volume, bucket, and directory structure. The tests include:
+ *
+ * 1. Test Root Level Search Restriction: Ensures searching at the root level 
returns a bad request.
+ * 2. Test Volume Level Search Restriction: Ensures searching at the volume 
level returns a bad request.
+ * 3. Test Bucket Level Search: Verifies search results within different types 
of buckets, both FSO and OBS.
+ * 4. Test Directory Level Search: Validates searching inside specific 
directories.
+ * 5. Test Key Level Search: Confirms search results for specific keys within 
buckets, both FSO and OBS.
+ * 6. Test Key Level Search Under Directory: Verifies searching for keys 
within nested directories.
+ * 7. Test Search Under Nested Directory: Checks search results within nested 
directories.
+ * 8. Test Limit Search: Tests the limit functionality of the search API.
+ * 9. Test Search Deleted Keys with Bad Request: Ensures bad requests with 
invalid params return correct responses.
+ * 10. Test Last Key in Response: Confirms the presence of the last key in 
paginated responses.
+ * 11. Test Search Deleted Keys with Pagination: Verifies paginated search 
results.
+ * 12. Test Search in Empty Bucket: Checks the response for searching within 
an empty bucket.
+ */
+public class TestDeletedKeysSearchEndpoint extends AbstractReconSqlDBTest {
+
+  @TempDir
+  private Path temporaryFolder;
+  private ReconOMMetadataManager reconOMMetadataManager;
+  private OMDBInsightEndpoint omdbInsightEndpoint;
+  private OzoneConfiguration ozoneConfiguration;
+  private static final String ROOT_PATH = "/";
+  private OMMetadataManager omMetadataManager;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    ozoneConfiguration = new OzoneConfiguration();
+    
ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 
100);
+    omMetadataManager = initializeNewOmMetadataManager(
+        
Files.createDirectory(temporaryFolder.resolve("JunitOmDBDir")).toFile());
+    reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
+        
Files.createDirectory(temporaryFolder.resolve("OmMetataDir")).toFile());
+
+    ReconTestInjector reconTestInjector =
+        new ReconTestInjector.Builder(temporaryFolder.toFile())
+            .withReconSqlDb()
+            .withReconOm(reconOMMetadataManager)
+            .withOmServiceProvider(mock(OzoneManagerServiceProviderImpl.class))
+            .addBinding(OzoneStorageContainerManager.class,
+                ReconStorageContainerManagerFacade.class)
+            .withContainerDB()
+            .addBinding(StorageContainerServiceProvider.class,
+                mock(StorageContainerServiceProviderImpl.class))
+            .addBinding(OMDBInsightEndpoint.class)
+            .addBinding(ContainerHealthSchemaManager.class)
+            .build();
+    omdbInsightEndpoint = 
reconTestInjector.getInstance(OMDBInsightEndpoint.class);
+    populateOMDB();
+  }
+
+
+  private static OMMetadataManager initializeNewOmMetadataManager(File 
omDbDir) throws IOException {
+    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath());
+    return new OmMetadataManagerImpl(omConfiguration, null);
+  }
+
+  @Test
+  public void testRootLevelSearchRestriction() throws IOException {
+    String rootPath = "/";
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
rootPath);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testEmptySearchPrefix() throws IOException {
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(100, "", "");
+    // In this case we get all the keys from the OMDB
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(16, result.getRepeatedOmKeyInfoList().size());
+
+    // Set limit to 10 and pass empty search prefix
+    response = omdbInsightEndpoint.getDeletedKeyInfo(10, "", "");
+    // In this case we get all the keys from the OMDB
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(10, result.getRepeatedOmKeyInfoList().size());
+  }
+
+  @Test
+  public void testVolumeLevelSearchRestriction() throws IOException {
+    String volumePath = "/vola";
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
volumePath);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+
+    volumePath = "/volb";
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", volumePath);
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testBucketLevelSearch() throws IOException {
+    // Search inside FSO bucket
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volb/bucketb1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(7, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(2, "", "/volb/bucketb1");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+
+    // Search inside OBS bucket
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", "/volc/bucketc1");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(9, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/vola/nonexistentbucket");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testDirectoryLevelSearch() throws IOException {
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(4, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc2");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(5, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volb/bucketb1/nonexistentdir");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testKeyLevelSearch() throws IOException {
+    // FSO Bucket key-level search
+    Response response =
+        omdbInsightEndpoint.getDeletedKeyInfo(10, "", "/volb/bucketb1/fileb1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result =
+        (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getRepeatedOmKeyInfoList().size());
+
+    response =
+        omdbInsightEndpoint.getDeletedKeyInfo(10, "", "/volb/bucketb1/fileb2");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getRepeatedOmKeyInfoList().size());
+
+    // Test with non-existent key
+    response = omdbInsightEndpoint.getDeletedKeyInfo(1, "", 
"/volb/bucketb1/nonexistentfile");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(),
+        response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testKeyLevelSearchUnderDirectory() throws IOException {
+    // FSO Bucket key-level search under directory
+    Response response =
+        omdbInsightEndpoint.getDeletedKeyInfo(10, "", 
"/volb/bucketb1/dir1/file1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(1, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(10, "",
+        "/volb/bucketb1/dir1/nonexistentfile");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(),
+        response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testSearchUnderNestedDirectory() throws IOException {
+    // OBS Bucket nested directory search
+    Response response =
+        omdbInsightEndpoint.getDeletedKeyInfo(20, "", "/volc/bucketc1/dirc1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(4, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc1/dirc11");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc1/dirc11/dirc111");
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getRepeatedOmKeyInfoList().size());
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc1/dirc11/dirc111/nonexistentfile");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volc/bucketc1/dirc1/dirc11/nonexistentfile");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testLimitSearch() throws IOException {
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(2, "", 
"/volb/bucketb1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+  }
+
+  @Test
+  public void testSearchDeletedKeysWithBadRequest() throws IOException {
+    int negativeLimit = -1;
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(negativeLimit, 
"", "@323232");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", "///");
+    assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    entity = (String) response.getEntity();
+    assertTrue(entity.contains("Invalid startPrefix: Path must be at the 
bucket level or deeper"),
+        "Expected a message indicating the path must be at the bucket level or 
deeper");
+  }
+
+  @Test
+  public void testLastKeyInResponse() throws IOException {
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volb/bucketb1");
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(7, result.getRepeatedOmKeyInfoList().size());
+
+    // Compute the expected last key from the last entry in the result list
+    String computedLastKey = "/" +
+        
result.getRepeatedOmKeyInfoList().get(6).getOmKeyInfoList().get(0).getVolumeName()
 + "/" +
+        
result.getRepeatedOmKeyInfoList().get(6).getOmKeyInfoList().get(0).getBucketName()
 + "/" +
+        
result.getRepeatedOmKeyInfoList().get(6).getOmKeyInfoList().get(0).getKeyName() 
+ "/";
+
+    // Check that the last key in the response starts with the expected value
+    assertTrue(result.getLastKey().startsWith(computedLastKey));
+  }
+
+  @Test
+  public void testSearchDeletedKeysWithPagination() throws IOException {
+    String startPrefix = "/volb/bucketb1";
+    int limit = 2;
+    String prevKey = "";
+
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+
+    prevKey = result.getLastKey();
+    assertNotNull(prevKey, "Last key should not be null");
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+
+    prevKey = result.getLastKey();
+    assertNotNull(prevKey, "Last key should not be null");
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+
+    prevKey = result.getLastKey();
+    assertNotNull(prevKey, "Last key should not be null");
+
+    response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    result = (KeyInsightInfoResponse) response.getEntity();
+    assertEquals(1, result.getRepeatedOmKeyInfoList().size());
+    // Compute the expected last key from the last entry in the result list
+    String computedLastKey = "/" +
+        result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0)
+            .getVolumeName() + "/" +
+        result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0)
+            .getBucketName() + "/" +
+        result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0)
+            .getKeyName() + "/";
+
+    // Check that the last key in the response starts with the expected value
+    assertTrue(result.getLastKey().startsWith(computedLastKey));
+  }
+
+  @Test
+  public void testSearchInEmptyBucket() throws IOException {
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(20, "", 
"/volb/bucketb2");
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    String entity = (String) response.getEntity();
+    assertTrue(entity.contains("No keys matched the search prefix"),
+        "Expected a message indicating no keys were found");
+  }
+
+  @Test
+  public void testPrevKeyProvidedStartPrefixEmpty() throws IOException {
+    // Case 1: prevKey provided, startPrefix empty
+    // Seek to the prevKey, skip the first matching record, then return 
remaining records until limit is reached.
+    String prevKey = "/volb/bucketb1/fileb3"; // This key exists, will skip it
+    int limit = 3;
+    String startPrefix = ""; // Empty startPrefix
+
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+
+    // Assert that we get the next 3 records after skipping the prevKey
+    assertEquals(3, result.getRepeatedOmKeyInfoList().size());
+    assertEquals("fileb4", 
result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+  @Test
+  public void testPrevKeyEmptyStartPrefixEmpty() throws IOException {
+    // Case 2: prevKey empty, startPrefix empty
+    // No need to seek, start from the first record and return records until 
limit is reached.
+    String prevKey = ""; // Empty prevKey
+    int limit = 100;
+    String startPrefix = ""; // Empty startPrefix
+
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+
+    // Assert that we get all the 16 records currently in the deleted keys 
table
+    assertEquals(16, result.getRepeatedOmKeyInfoList().size());
+  }
+
+  @Test
+  public void testPrevKeyEmptyStartPrefixProvided() throws IOException {
+    // Case 3: prevKey empty, startPrefix provided
+    // Seek to the startPrefix and return matching records until limit is 
reached.
+    String prevKey = ""; // Empty prevKey
+    int limit = 2;
+    String startPrefix = "/volb/bucketb1/fileb"; // Seek to startPrefix and 
match files
+
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+
+    // Assert that we get the first 2 records that match startPrefix
+    assertEquals(2, result.getRepeatedOmKeyInfoList().size());
+    assertEquals("fileb1", 
result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+  @Test
+  public void testPrevKeyProvidedStartPrefixProvided() throws IOException {
+    // Case 4: prevKey provided, startPrefix provided
+    // Seek to the prevKey, skip it, and return remaining records matching 
startPrefix until limit is reached.
+    String prevKey = "/volb/bucketb1/fileb2"; // This key exists, will skip it
+    int limit = 3;
+    String startPrefix = "/volb/bucketb1"; // Matching prefix
+
+    Response response = omdbInsightEndpoint.getDeletedKeyInfo(limit, prevKey, 
startPrefix);
+    assertEquals(200, response.getStatus());
+    KeyInsightInfoResponse result = (KeyInsightInfoResponse) 
response.getEntity();
+
+    // Assert that we get the next 2 records that match startPrefix after 
skipping prevKey having fileb2
+    assertEquals(3, result.getRepeatedOmKeyInfoList().size());
+    assertEquals("fileb3", 
result.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+
+  /**
+   * Populates the OMDB with a set of deleted keys for testing purposes.
+   * This diagram is for reference:
+   * * root
+   *   ├── volb (Total Size: 7000KB)
+   *   │   ├── bucketb1 (Total Size: 7000KB)
+   *   │   │   ├── fileb1 (Size: 1000KB)
+   *   │   │   ├── fileb2 (Size: 1000KB)
+   *   │   │   ├── fileb3 (Size: 1000KB)
+   *   │   │   ├── fileb4 (Size: 1000KB)
+   *   │   │   ├── fileb5 (Size: 1000KB)
+   *   │   │   ├── dir1 (Total Size: 2000KB)
+   *   │   │   │   ├── file1 (Size: 1000KB)
+   *   │   │   │   └── file2 (Size: 1000KB)
+   *   ├── volc (Total Size: 9000KB)
+   *   │   ├── bucketc1 (Total Size: 9000KB)
+   *   │   │   ├── dirc1 (Total Size: 4000KB)
+   *   │   │   │   ├── filec1 (Size: 1000KB)
+   *   │   │   │   ├── filec2 (Size: 1000KB)
+   *   │   │   │   ├── dirc11 (Total Size: 2000KB)
+   *   │   │   │       ├── filec11 (Size: 1000KB)
+   *   │   │   │       └── dirc111 (Total Size: 1000KB)
+   *   │   │   │           └── filec111 (Size: 1000KB)
+   *   │   │   ├── dirc2 (Total Size: 5000KB)
+   *   │   │   │   ├── filec3 (Size: 1000KB)
+   *   │   │   │   ├── filec4 (Size: 1000KB)
+   *   │   │   │   ├── filec5 (Size: 1000KB)
+   *   │   │   │   ├── filec6 (Size: 1000KB)
+   *   │   │   │   └── filec7 (Size: 1000KB)
+   *
+   * @throws Exception if an error occurs while creating deleted keys.
+   */
+  private void populateOMDB() throws Exception {
+
+    createDeletedKey("fileb1", "bucketb1", "volb", 1000);
+    createDeletedKey("fileb2", "bucketb1", "volb", 1000);
+    createDeletedKey("fileb3", "bucketb1", "volb", 1000);
+    createDeletedKey("fileb4", "bucketb1", "volb", 1000);
+    createDeletedKey("fileb5", "bucketb1", "volb", 1000);
+
+    createDeletedKey("dir1/file1", "bucketb1", "volb", 1000);
+    createDeletedKey("dir1/file2", "bucketb1", "volb", 1000);
+
+    createDeletedKey("dirc1/filec1", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc1/filec2", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc2/filec3", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc2/filec4", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc2/filec5", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc2/filgetec6", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc2/filec7", "bucketc1", "volc", 1000);
+
+    // create nested directories and files in bucketc1
+    createDeletedKey("dirc1/dirc11/filec11", "bucketc1", "volc", 1000);
+    createDeletedKey("dirc1/dirc11/dirc111/filec111", "bucketc1", "volc", 
1000);
+  }
+
+  private void createDeletedKey(String keyName, String bucketName,
+                                String volumeName, long dataSize) throws 
IOException {
+    // Construct the deleted key path
+    String deletedKey = "/" + volumeName + "/" + bucketName + "/" + keyName + 
"/" +
+            UUID.randomUUID().getMostSignificantBits();
+
+    // Create a list to hold OmKeyInfo objects
+    List<OmKeyInfo> omKeyInfos = new ArrayList<>();
+
+    // Build OmKeyInfo object
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(dataSize)
+        .setObjectID(UUID.randomUUID().getMostSignificantBits())
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.ONE))
+        .build();
+
+    // Add the OmKeyInfo object to the list
+    omKeyInfos.add(omKeyInfo);
+
+    // Create a RepeatedOmKeyInfo object with the list of OmKeyInfo
+    RepeatedOmKeyInfo repeatedOmKeyInfo = new RepeatedOmKeyInfo(omKeyInfos);
+
+    // Write the deleted key information to the OM metadata manager
+    writeDeletedKeysToOm(reconOMMetadataManager, deletedKey, 
repeatedOmKeyInfo);
+  }
+
+  /**
+   * Writes deleted key information to the Ozone Manager metadata table.
+   * @param omMetadataManager the Ozone Manager metadata manager
+   * @param deletedKey the name of the deleted key
+   * @param repeatedOmKeyInfo the RepeatedOmKeyInfo object containing key 
information
+   * @throws IOException if there is an error accessing the metadata table
+   */
+  public static void writeDeletedKeysToOm(OMMetadataManager omMetadataManager,
+                                          String deletedKey,
+                                          RepeatedOmKeyInfo repeatedOmKeyInfo) 
throws IOException {
+    // Put the deleted key information into the deleted table
+    omMetadataManager.getDeletedTable().put(deletedKey, repeatedOmKeyInfo);
+  }
+
+}
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
index ab16f349af..c3c2fe5deb 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java
@@ -97,11 +97,9 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
   @BeforeEach
   public void setUp() throws Exception {
     ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
-        100);
+    
ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 
100);
     omMetadataManager = initializeNewOmMetadataManager(
-        Files.createDirectory(temporaryFolder.resolve("JunitOmDBDir"))
-            .toFile());
+        
Files.createDirectory(temporaryFolder.resolve("JunitOmDBDir")).toFile());
     reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
         
Files.createDirectory(temporaryFolder.resolve("OmMetataDir")).toFile());
 
@@ -221,7 +219,7 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
 
     // Test with bucket that does not exist
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/nonexistentbucket", 20, "");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
@@ -264,7 +262,7 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
 
     // Test with non-existent directory
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/nonexistentdir", 20, 
"");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
@@ -312,13 +310,13 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
 
     // Test with non-existent key
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/nonexistentfile", 1, 
"");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
 
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1/nonexistentfile", 1, 
"");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
@@ -344,14 +342,14 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
 
     // Test for unknown file in fso bucket
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira1/unknownfile", 
10, "");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
 
     // Test for unknown file in fso bucket
     response = 
omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira2/unknownfile", 
10, "");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
@@ -402,14 +400,14 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
     // Search for a non existant file under each nested directory
     response = omdbInsightSearchEndpoint.searchOpenKeys(
         "/vola/bucketa1/dira3/dira31/dira32/dira33/nonexistentfile", 20, "");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
 
     response = omdbInsightSearchEndpoint.searchOpenKeys(
         "/vola/bucketa1/dira3/dira31/dira32/nonexistentfile", 20, "");
-    assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
@@ -507,7 +505,7 @@ public class TestOMDBInsightSearchEndpoint extends 
AbstractReconSqlDBTest {
   public void testSearchInEmptyBucket() throws IOException {
     // Search in empty bucket bucketb2
     Response response = 
omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb2", 20, "");
-    assertEquals(404, response.getStatus());
+    assertEquals(Response.Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
     String entity = (String) response.getEntity();
     assertTrue(entity.contains("No keys matched the search prefix"),
         "Expected a message indicating no keys were found");
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 74c58cd9d3..a1e8585401 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -62,6 +62,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import javax.ws.rs.core.Response;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.sql.Timestamp;
@@ -1212,7 +1213,7 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
     reconOMMetadataManager.getDeletedTable()
         .put("/sampleVol/bucketOne/key_three", repeatedOmKeyInfo3);
 
-    Response deletedKeyInfo = omdbInsightEndpoint.getDeletedKeyInfo(2, "");
+    Response deletedKeyInfo = omdbInsightEndpoint.getDeletedKeyInfo(2, "", "");
     KeyInsightInfoResponse keyInsightInfoResp =
         (KeyInsightInfoResponse) deletedKeyInfo.getEntity();
     assertNotNull(keyInsightInfoResp);
@@ -1244,7 +1245,7 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
         .put("/sampleVol/bucketOne/key_three", repeatedOmKeyInfo3);
 
     Response deletedKeyInfo = omdbInsightEndpoint.getDeletedKeyInfo(2,
-        "/sampleVol/bucketOne/key_one");
+        "/sampleVol/bucketOne/key_one", "");
     KeyInsightInfoResponse keyInsightInfoResp =
         (KeyInsightInfoResponse) deletedKeyInfo.getEntity();
     assertNotNull(keyInsightInfoResp);
@@ -1278,7 +1279,7 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
             .get("/sampleVol/bucketOne/key_one");
     assertEquals("key_one",
         repeatedOmKeyInfo1.getOmKeyInfoList().get(0).getKeyName());
-    Response deletedKeyInfo = omdbInsightEndpoint.getDeletedKeyInfo(-1, "");
+    Response deletedKeyInfo = omdbInsightEndpoint.getDeletedKeyInfo(-1, "", 
"");
     KeyInsightInfoResponse keyInsightInfoResp =
         (KeyInsightInfoResponse) deletedKeyInfo.getEntity();
     assertNotNull(keyInsightInfoResp);
@@ -1287,6 +1288,128 @@ public class TestOmDBInsightEndPoint extends 
AbstractReconSqlDBTest {
             .get(0).getKeyName());
   }
 
+  @Test
+  public void testGetDeletedKeysWithPrevKeyProvidedAndStartPrefixEmpty()
+      throws Exception {
+    // Prepare mock data in the deletedTable.
+    for (int i = 1; i <= 10; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketOne", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketOne/deleted_key_" + i,
+              new RepeatedOmKeyInfo(omKeyInfo));
+    }
+
+    // Case 1: prevKey provided, startPrefix empty
+    Response deletedKeyInfoResponse = omdbInsightEndpoint.getDeletedKeyInfo(5,
+        "/sampleVol/bucketOne/deleted_key_3", "");
+    KeyInsightInfoResponse keyInsightInfoResp =
+        (KeyInsightInfoResponse) deletedKeyInfoResponse.getEntity();
+
+    // Validate that the response skips the prevKey and returns subsequent 
records.
+    assertNotNull(keyInsightInfoResp);
+    assertEquals(5, keyInsightInfoResp.getRepeatedOmKeyInfoList().size());
+    assertEquals("deleted_key_4",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+    assertEquals("deleted_key_8",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(4).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+  @Test
+  public void testGetDeletedKeysWithPrevKeyEmptyAndStartPrefixEmpty()
+      throws Exception {
+    // Prepare mock data in the deletedTable.
+    for (int i = 1; i < 10; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketOne", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketOne/deleted_key_" + i, new 
RepeatedOmKeyInfo(omKeyInfo));
+    }
+
+    // Case 2: prevKey empty, startPrefix empty
+    Response deletedKeyInfoResponse =
+        omdbInsightEndpoint.getDeletedKeyInfo(5, "", "");
+    KeyInsightInfoResponse keyInsightInfoResp =
+        (KeyInsightInfoResponse) deletedKeyInfoResponse.getEntity();
+
+    // Validate that the response retrieves from the beginning.
+    assertNotNull(keyInsightInfoResp);
+    assertEquals(5, keyInsightInfoResp.getRepeatedOmKeyInfoList().size());
+    assertEquals("deleted_key_1",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+    assertEquals("deleted_key_5",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(4).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+  @Test
+  public void testGetDeletedKeysWithStartPrefixProvidedAndPrevKeyEmpty()
+      throws Exception {
+    // Prepare mock data in the deletedTable.
+    for (int i = 1; i < 5; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketOne", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketOne/deleted_key_" + i, new 
RepeatedOmKeyInfo(omKeyInfo));
+    }
+    for (int i = 5; i < 10; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketTwo", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketTwo/deleted_key_" + i, new 
RepeatedOmKeyInfo(omKeyInfo));
+    }
+
+    // Case 3: startPrefix provided, prevKey empty
+    Response deletedKeyInfoResponse =
+        omdbInsightEndpoint.getDeletedKeyInfo(5, "",
+            "/sampleVol/bucketOne/");
+    KeyInsightInfoResponse keyInsightInfoResp =
+        (KeyInsightInfoResponse) deletedKeyInfoResponse.getEntity();
+
+    // Validate that the response retrieves starting from the prefix.
+    assertNotNull(keyInsightInfoResp);
+    assertEquals(4, keyInsightInfoResp.getRepeatedOmKeyInfoList().size());
+    assertEquals("deleted_key_1",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+    assertEquals("deleted_key_4",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(3).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+  @Test
+  public void testGetDeletedKeysWithBothPrevKeyAndStartPrefixProvided()
+      throws IOException {
+    // Prepare mock data in the deletedTable.
+    for (int i = 1; i < 10; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketOne", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketOne/deleted_key_" + i, new 
RepeatedOmKeyInfo(omKeyInfo));
+    }
+    for (int i = 10; i < 15; i++) {
+      OmKeyInfo omKeyInfo =
+          getOmKeyInfo("sampleVol", "bucketTwo", "deleted_key_" + i, true);
+      reconOMMetadataManager.getDeletedTable()
+          .put("/sampleVol/bucketTwo/deleted_key_" + i, new 
RepeatedOmKeyInfo(omKeyInfo));
+    }
+
+    // Case 4: startPrefix and prevKey provided
+    Response deletedKeyInfoResponse =
+        omdbInsightEndpoint.getDeletedKeyInfo(5,
+            "/sampleVol/bucketOne/deleted_key_5",
+            "/sampleVol/bucketOne/");
+
+    KeyInsightInfoResponse keyInsightInfoResp =
+        (KeyInsightInfoResponse) deletedKeyInfoResponse.getEntity();
+
+    // Validate that the response retrieves starting from the prefix and skips 
the prevKey.
+    assertNotNull(keyInsightInfoResp);
+    assertEquals(4, keyInsightInfoResp.getRepeatedOmKeyInfoList().size());
+    assertEquals("deleted_key_6",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(0).getOmKeyInfoList().get(0).getKeyName());
+    assertEquals("deleted_key_9",
+        
keyInsightInfoResp.getRepeatedOmKeyInfoList().get(3).getOmKeyInfoList().get(0).getKeyName());
+  }
+
+
   private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
                                  String keyName, boolean isFile) {
     return new OmKeyInfo.Builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to