devmadhuu commented on code in PR #6231: URL: https://github.com/apache/ozone/pull/6231#discussion_r1609301039
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler; +import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo; +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse; +import static org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix; +import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames; +import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath; + +/** + * REST endpoint for search implementation in OM DB Insight. + */ +@Path("/keys") +@Produces(MediaType.APPLICATION_JSON) +@AdminOnly +public class OMDBInsightSearchEndpoint { + + private OzoneStorageContainerManager reconSCM; + private final ReconOMMetadataManager omMetadataManager; + private static final Logger LOG = + LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class); + private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager; + + + @Inject + public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM, + ReconOMMetadataManager omMetadataManager, + ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager) { + this.reconSCM = reconSCM; + this.omMetadataManager = omMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + } + + + /** + * Performs a search for open keys in the Ozone Manager (OM) database using a specified search prefix. + * This endpoint searches across both File System Optimized (FSO) and Object Store (non-FSO) layouts, + * compiling a list of keys that match the given prefix along with their data sizes. + * <p> + * The search prefix must start from the bucket level ('/volumeName/bucketName/') or any specific directory + * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' inside 'bucketA' of 'volA'). + * The search operation matches the prefix against the start of keys' names within the OM DB. + * <p> + * Example Usage: + * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 'bucketA' in volume 'volA'. + * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' inside 'bucketA' of 'volA'. + * + * @param startPrefix The prefix for searching keys, starting from the bucket level or any specific path. + * @param limit Limits the number of returned keys. + * @param prevKey The key to start after for the next set of records. + * @return A KeyInsightInfoResponse, containing matching keys and their data sizes. + * @throws IOException On failure to access the OM database or process the operation. + */ + @GET + @Path("/open/search") + public Response searchOpenKeys( + @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix") + String startPrefix, + @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit") + int limit, + @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) @QueryParam("prevKey") String prevKey) throws IOException { + + try { + // Ensure startPrefix is not null or empty and starts with '/' + if (startPrefix == null || startPrefix.length() == 0) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix; + + // Split the path to ensure it's at least at the bucket level + String[] pathComponents = startPrefix.split("/"); + if (pathComponents.length < 3 || pathComponents[2].isEmpty()) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + + // Ensure the limit is non-negative + limit = Math.max(0, limit); + + // Initialize response object + KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse(); + long replicatedTotal = 0; + long unreplicatedTotal = 0; + boolean keysFound = false; // Flag to track if any keys are found + String lastKey = null; + + // Search for non-fso keys in KeyTable + Table<String, OmKeyInfo> openKeyTable = + omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY); + Map<String, OmKeyInfo> obsKeys = + retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getNonFSOKeyInfoList() + .add(keyEntityInfo); // Add to non-FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // Search for fso keys in FileTable + Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getFsoKeyInfoList() + .add(keyEntityInfo); // Add to FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // If no keys were found, return a response indicating that no keys matched + if (!keysFound) { + return noMatchedKeysResponse(startPrefix); + } + + // Set the aggregated totals in the response + insightResponse.setReplicatedDataSize(replicatedTotal); + insightResponse.setUnreplicatedDataSize(unreplicatedTotal); + insightResponse.setLastKey(lastKey); + + // Return the response with the matched keys and their data sizes + return Response.ok(insightResponse).build(); + } catch (IOException e) { + // Handle IO exceptions and return an internal server error response + return createInternalServerErrorResponse( + "Error searching open keys in OM DB: " + e.getMessage()); + } catch (IllegalArgumentException e) { + // Handle illegal argument exceptions and return a bad request response + return createBadRequestResponse( + "Invalid startPrefix: " + e.getMessage()); + } + } + + public Map<String, OmKeyInfo> searchOpenKeysInFSO(String startPrefix, + int limit, String prevKey) + throws IOException, IllegalArgumentException { + Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>(); + // Convert the search prefix to an object path for FSO buckets + String startPrefixObjectPath = convertToObjectPath(startPrefix); + String[] names = parseRequestPath(startPrefixObjectPath); + Table<String, OmKeyInfo> openFileTable = + omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED); + + // If names.length <= 2, then the search prefix is at the volume or bucket level hence + // no need to find parent or extract id's or find subpaths as the openFileTable is + // suitable for volume and bucket level search + if (names.length > 2) { + // Fetch the parent ID to search for + long parentId = Long.parseLong(names[names.length - 1]); + + // Fetch the nameSpaceSummary for the parent ID + NSSummary parentSummary = + reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return matchedKeys; + } + List<String> subPaths = new ArrayList<>(); + // Add the initial search prefix object path because it can have both openFiles + // and subdirectories with openFiles + subPaths.add(startPrefixObjectPath); + + // Recursively gather all subpaths + gatherSubPaths(parentId, subPaths, names); + + // Iterate over the subpaths and retrieve the open files + for (String subPath : subPaths) { + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, subPath, + limit - matchedKeys.size(), prevKey)); + if (matchedKeys.size() >= limit) { + break; + } + } + return matchedKeys; + } + + // Iterate over for bucket and volume level search + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, startPrefixObjectPath, limit, prevKey)); + return matchedKeys; + } + + /** + * Finds all subdirectories under a parent directory in an FSO bucket. It builds + * a list of paths for these subdirectories. These sub-directories are then used + * to search for open files in the openFileTable. + * + * How it works: + * - Starts from a parent directory identified by parentId. + * - Looks through all child directories of this parent. + * - For each child, it creates a path that starts with volumeID/bucketID/parentId, + * following our openFileTable format + * - Adds these paths to a list and explores each child further for more subdirectories. + * + * @param parentId The ID of the directory we start exploring from. + * @param subPaths A list where we collect paths to all subdirectories. + * @param names An array with at least two elements: the first is volumeID and + * the second is bucketID. These are used to start each path. + * @throws IOException If there are problems accessing directory information. + */ + private void gatherSubPaths(long parentId, List<String> subPaths, + String[] names) throws IOException { + // Fetch the NSSummary object for parentId + NSSummary parentSummary = reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return; + } + long volumeID = Long.parseLong(names[0]); + long bucketID = Long.parseLong(names[1]); + Set<Long> childDirIds = parentSummary.getChildDir(); + for (Long childId : childDirIds) { + // Fetch the NSSummary for each child directory + NSSummary childSummary = reconNamespaceSummaryManager.getNSSummary(childId); + if (childSummary != null) { + String subPath = constructObjectPathWithPrefix(volumeID, bucketID, childId); + // Add to subPaths + subPaths.add(subPath); + // Recurse into this child directory + gatherSubPaths(childId, subPaths, names); + } + } + } + + + /** + * Converts a key prefix into an object path for FSO buckets, using IDs. + * <p> + * This method transforms a user-provided path (e.g., "volume/bucket/dir1") into + * a database-friendly format ("/volumeID/bucketID/ParentId/") by replacing names + * with their corresponding IDs. It simplifies database queries for FSO bucket operations. + * + * @param prevKeyPrefix The path to be converted, not including key or directory names/IDs. + * @return The object path as "/volumeID/bucketID/ParentId/" or an empty string if an error occurs. + * @throws IOException If database access fails. + */ + public String convertToObjectPath(String prevKeyPrefix) throws IOException, IllegalArgumentException { + try { + String[] names = parseRequestPath(normalizePath(prevKeyPrefix, BucketLayout.FILE_SYSTEM_OPTIMIZED)); + + // Root-Level :- Return the original path + if (names.length == 0) { + return prevKeyPrefix; + } + + // Volume-Level :- Fetch the volumeID + String volumeName = names[0]; + validateNames(volumeName); + String volumeKey = omMetadataManager.getVolumeKey(volumeName); + long volumeId = omMetadataManager.getVolumeTable().getSkipCache(volumeKey).getObjectID(); + if (names.length == 1) { + return constructObjectPathWithPrefix(volumeId); + } + + // Bucket-Level :- Fetch the bucketID + String bucketName = names[1]; + validateNames(bucketName); + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().getSkipCache(bucketKey); + long bucketId = bucketInfo.getObjectID(); + if (names.length == 2) { + return constructObjectPathWithPrefix(volumeId, bucketId); + } + + // Fetch the immediate parentID which could be a directory or the bucket itself + BucketHandler handler = getBucketHandler(reconNamespaceSummaryManager, omMetadataManager, reconSCM, bucketInfo); + long dirObjectId = handler.getDirInfo(names).getObjectID(); Review Comment: Here this might throw an exception for Legacy bucket and startPrefix is > 2. Pls check the impl for getDirInfo which throws exception for Legacy bucket path. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler; +import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo; +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse; +import static org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix; +import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames; +import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath; + +/** + * REST endpoint for search implementation in OM DB Insight. + */ +@Path("/keys") +@Produces(MediaType.APPLICATION_JSON) +@AdminOnly +public class OMDBInsightSearchEndpoint { + + private OzoneStorageContainerManager reconSCM; + private final ReconOMMetadataManager omMetadataManager; + private static final Logger LOG = + LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class); + private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager; + + + @Inject + public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM, + ReconOMMetadataManager omMetadataManager, + ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager) { + this.reconSCM = reconSCM; + this.omMetadataManager = omMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + } + + + /** + * Performs a search for open keys in the Ozone Manager (OM) database using a specified search prefix. + * This endpoint searches across both File System Optimized (FSO) and Object Store (non-FSO) layouts, + * compiling a list of keys that match the given prefix along with their data sizes. + * <p> + * The search prefix must start from the bucket level ('/volumeName/bucketName/') or any specific directory + * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' inside 'bucketA' of 'volA'). + * The search operation matches the prefix against the start of keys' names within the OM DB. + * <p> + * Example Usage: + * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 'bucketA' in volume 'volA'. + * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' inside 'bucketA' of 'volA'. + * + * @param startPrefix The prefix for searching keys, starting from the bucket level or any specific path. + * @param limit Limits the number of returned keys. + * @param prevKey The key to start after for the next set of records. + * @return A KeyInsightInfoResponse, containing matching keys and their data sizes. + * @throws IOException On failure to access the OM database or process the operation. + */ + @GET + @Path("/open/search") + public Response searchOpenKeys( + @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix") + String startPrefix, + @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit") + int limit, + @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) @QueryParam("prevKey") String prevKey) throws IOException { + + try { + // Ensure startPrefix is not null or empty and starts with '/' + if (startPrefix == null || startPrefix.length() == 0) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix; + + // Split the path to ensure it's at least at the bucket level + String[] pathComponents = startPrefix.split("/"); + if (pathComponents.length < 3 || pathComponents[2].isEmpty()) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + + // Ensure the limit is non-negative + limit = Math.max(0, limit); + + // Initialize response object + KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse(); + long replicatedTotal = 0; + long unreplicatedTotal = 0; + boolean keysFound = false; // Flag to track if any keys are found + String lastKey = null; + + // Search for non-fso keys in KeyTable + Table<String, OmKeyInfo> openKeyTable = + omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY); + Map<String, OmKeyInfo> obsKeys = + retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getNonFSOKeyInfoList() + .add(keyEntityInfo); // Add to non-FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // Search for fso keys in FileTable + Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getFsoKeyInfoList() + .add(keyEntityInfo); // Add to FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // If no keys were found, return a response indicating that no keys matched + if (!keysFound) { + return noMatchedKeysResponse(startPrefix); + } + + // Set the aggregated totals in the response + insightResponse.setReplicatedDataSize(replicatedTotal); + insightResponse.setUnreplicatedDataSize(unreplicatedTotal); + insightResponse.setLastKey(lastKey); + + // Return the response with the matched keys and their data sizes + return Response.ok(insightResponse).build(); + } catch (IOException e) { + // Handle IO exceptions and return an internal server error response + return createInternalServerErrorResponse( + "Error searching open keys in OM DB: " + e.getMessage()); + } catch (IllegalArgumentException e) { + // Handle illegal argument exceptions and return a bad request response + return createBadRequestResponse( + "Invalid startPrefix: " + e.getMessage()); + } + } + + public Map<String, OmKeyInfo> searchOpenKeysInFSO(String startPrefix, + int limit, String prevKey) + throws IOException, IllegalArgumentException { + Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>(); + // Convert the search prefix to an object path for FSO buckets + String startPrefixObjectPath = convertToObjectPath(startPrefix); + String[] names = parseRequestPath(startPrefixObjectPath); + Table<String, OmKeyInfo> openFileTable = + omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED); + + // If names.length <= 2, then the search prefix is at the volume or bucket level hence + // no need to find parent or extract id's or find subpaths as the openFileTable is + // suitable for volume and bucket level search + if (names.length > 2) { + // Fetch the parent ID to search for + long parentId = Long.parseLong(names[names.length - 1]); + + // Fetch the nameSpaceSummary for the parent ID + NSSummary parentSummary = + reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return matchedKeys; + } + List<String> subPaths = new ArrayList<>(); + // Add the initial search prefix object path because it can have both openFiles + // and subdirectories with openFiles + subPaths.add(startPrefixObjectPath); + + // Recursively gather all subpaths + gatherSubPaths(parentId, subPaths, names); + + // Iterate over the subpaths and retrieve the open files + for (String subPath : subPaths) { + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, subPath, + limit - matchedKeys.size(), prevKey)); + if (matchedKeys.size() >= limit) { + break; + } + } + return matchedKeys; + } + + // Iterate over for bucket and volume level search + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, startPrefixObjectPath, limit, prevKey)); + return matchedKeys; + } + + /** + * Finds all subdirectories under a parent directory in an FSO bucket. It builds + * a list of paths for these subdirectories. These sub-directories are then used + * to search for open files in the openFileTable. + * + * How it works: + * - Starts from a parent directory identified by parentId. + * - Looks through all child directories of this parent. + * - For each child, it creates a path that starts with volumeID/bucketID/parentId, + * following our openFileTable format + * - Adds these paths to a list and explores each child further for more subdirectories. + * + * @param parentId The ID of the directory we start exploring from. + * @param subPaths A list where we collect paths to all subdirectories. + * @param names An array with at least two elements: the first is volumeID and + * the second is bucketID. These are used to start each path. + * @throws IOException If there are problems accessing directory information. + */ + private void gatherSubPaths(long parentId, List<String> subPaths, + String[] names) throws IOException { + // Fetch the NSSummary object for parentId + NSSummary parentSummary = reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return; + } + long volumeID = Long.parseLong(names[0]); + long bucketID = Long.parseLong(names[1]); + Set<Long> childDirIds = parentSummary.getChildDir(); + for (Long childId : childDirIds) { + // Fetch the NSSummary for each child directory + NSSummary childSummary = reconNamespaceSummaryManager.getNSSummary(childId); + if (childSummary != null) { + String subPath = constructObjectPathWithPrefix(volumeID, bucketID, childId); + // Add to subPaths + subPaths.add(subPath); + // Recurse into this child directory + gatherSubPaths(childId, subPaths, names); + } + } + } + + + /** + * Converts a key prefix into an object path for FSO buckets, using IDs. + * <p> + * This method transforms a user-provided path (e.g., "volume/bucket/dir1") into + * a database-friendly format ("/volumeID/bucketID/ParentId/") by replacing names + * with their corresponding IDs. It simplifies database queries for FSO bucket operations. + * + * @param prevKeyPrefix The path to be converted, not including key or directory names/IDs. + * @return The object path as "/volumeID/bucketID/ParentId/" or an empty string if an error occurs. + * @throws IOException If database access fails. + */ + public String convertToObjectPath(String prevKeyPrefix) throws IOException, IllegalArgumentException { Review Comment: In what use case, it will throw `IllegalArgumentException`, pls check, I have a doubt, and even if this method throws in some scenario, then pls write javadoc comment. ########## hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; + +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; +import org.junit.jupiter.api.BeforeEach; + +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenFileToOm; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenKeyToOm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.recon.ReconTestInjector; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import javax.ws.rs.core.Response; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; + +/** + * Test for OMDBInsightSearchEndpoint. + */ +public class TestOMDBInsightSearchEndpoint extends AbstractReconSqlDBTest { Review Comment: Add a test case with legacy bucket key using startPrefix path of exact key path, I think that getDirInfo will throw exception in this case and may not work. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler; +import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo; +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse; +import static org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix; +import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames; +import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath; + +/** + * REST endpoint for search implementation in OM DB Insight. + */ +@Path("/keys") +@Produces(MediaType.APPLICATION_JSON) +@AdminOnly +public class OMDBInsightSearchEndpoint { + + private OzoneStorageContainerManager reconSCM; + private final ReconOMMetadataManager omMetadataManager; + private static final Logger LOG = + LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class); + private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager; + + + @Inject + public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM, + ReconOMMetadataManager omMetadataManager, + ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager) { + this.reconSCM = reconSCM; + this.omMetadataManager = omMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + } + + + /** + * Performs a search for open keys in the Ozone Manager (OM) database using a specified search prefix. + * This endpoint searches across both File System Optimized (FSO) and Object Store (non-FSO) layouts, + * compiling a list of keys that match the given prefix along with their data sizes. + * <p> + * The search prefix must start from the bucket level ('/volumeName/bucketName/') or any specific directory + * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' inside 'bucketA' of 'volA'). + * The search operation matches the prefix against the start of keys' names within the OM DB. + * <p> + * Example Usage: + * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 'bucketA' in volume 'volA'. + * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' inside 'bucketA' of 'volA'. + * + * @param startPrefix The prefix for searching keys, starting from the bucket level or any specific path. + * @param limit Limits the number of returned keys. + * @param prevKey The key to start after for the next set of records. + * @return A KeyInsightInfoResponse, containing matching keys and their data sizes. + * @throws IOException On failure to access the OM database or process the operation. + */ + @GET + @Path("/open/search") + public Response searchOpenKeys( + @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix") + String startPrefix, + @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit") + int limit, + @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) @QueryParam("prevKey") String prevKey) throws IOException { + + try { + // Ensure startPrefix is not null or empty and starts with '/' + if (startPrefix == null || startPrefix.length() == 0) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix; + + // Split the path to ensure it's at least at the bucket level + String[] pathComponents = startPrefix.split("/"); + if (pathComponents.length < 3 || pathComponents[2].isEmpty()) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + + // Ensure the limit is non-negative + limit = Math.max(0, limit); + + // Initialize response object + KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse(); + long replicatedTotal = 0; + long unreplicatedTotal = 0; + boolean keysFound = false; // Flag to track if any keys are found + String lastKey = null; + + // Search for non-fso keys in KeyTable + Table<String, OmKeyInfo> openKeyTable = + omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY); + Map<String, OmKeyInfo> obsKeys = + retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getNonFSOKeyInfoList() + .add(keyEntityInfo); // Add to non-FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // Search for fso keys in FileTable + Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getFsoKeyInfoList() + .add(keyEntityInfo); // Add to FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // If no keys were found, return a response indicating that no keys matched + if (!keysFound) { + return noMatchedKeysResponse(startPrefix); + } + + // Set the aggregated totals in the response + insightResponse.setReplicatedDataSize(replicatedTotal); + insightResponse.setUnreplicatedDataSize(unreplicatedTotal); + insightResponse.setLastKey(lastKey); + + // Return the response with the matched keys and their data sizes + return Response.ok(insightResponse).build(); + } catch (IOException e) { + // Handle IO exceptions and return an internal server error response + return createInternalServerErrorResponse( + "Error searching open keys in OM DB: " + e.getMessage()); + } catch (IllegalArgumentException e) { + // Handle illegal argument exceptions and return a bad request response + return createBadRequestResponse( + "Invalid startPrefix: " + e.getMessage()); + } + } + + public Map<String, OmKeyInfo> searchOpenKeysInFSO(String startPrefix, + int limit, String prevKey) + throws IOException, IllegalArgumentException { + Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>(); + // Convert the search prefix to an object path for FSO buckets + String startPrefixObjectPath = convertToObjectPath(startPrefix); + String[] names = parseRequestPath(startPrefixObjectPath); + Table<String, OmKeyInfo> openFileTable = + omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED); + + // If names.length <= 2, then the search prefix is at the volume or bucket level hence + // no need to find parent or extract id's or find subpaths as the openFileTable is + // suitable for volume and bucket level search + if (names.length > 2) { + // Fetch the parent ID to search for + long parentId = Long.parseLong(names[names.length - 1]); + + // Fetch the nameSpaceSummary for the parent ID + NSSummary parentSummary = + reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return matchedKeys; + } + List<String> subPaths = new ArrayList<>(); + // Add the initial search prefix object path because it can have both openFiles + // and subdirectories with openFiles + subPaths.add(startPrefixObjectPath); + + // Recursively gather all subpaths + gatherSubPaths(parentId, subPaths, names); + + // Iterate over the subpaths and retrieve the open files + for (String subPath : subPaths) { + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, subPath, + limit - matchedKeys.size(), prevKey)); + if (matchedKeys.size() >= limit) { + break; + } + } + return matchedKeys; + } + + // Iterate over for bucket and volume level search + matchedKeys.putAll(retrieveKeysFromTable(openFileTable, startPrefixObjectPath, limit, prevKey)); + return matchedKeys; + } + + /** + * Finds all subdirectories under a parent directory in an FSO bucket. It builds + * a list of paths for these subdirectories. These sub-directories are then used + * to search for open files in the openFileTable. + * + * How it works: + * - Starts from a parent directory identified by parentId. + * - Looks through all child directories of this parent. + * - For each child, it creates a path that starts with volumeID/bucketID/parentId, + * following our openFileTable format + * - Adds these paths to a list and explores each child further for more subdirectories. + * + * @param parentId The ID of the directory we start exploring from. + * @param subPaths A list where we collect paths to all subdirectories. + * @param names An array with at least two elements: the first is volumeID and + * the second is bucketID. These are used to start each path. + * @throws IOException If there are problems accessing directory information. + */ + private void gatherSubPaths(long parentId, List<String> subPaths, + String[] names) throws IOException { + // Fetch the NSSummary object for parentId + NSSummary parentSummary = reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return; + } + long volumeID = Long.parseLong(names[0]); + long bucketID = Long.parseLong(names[1]); + Set<Long> childDirIds = parentSummary.getChildDir(); + for (Long childId : childDirIds) { + // Fetch the NSSummary for each child directory + NSSummary childSummary = reconNamespaceSummaryManager.getNSSummary(childId); + if (childSummary != null) { + String subPath = constructObjectPathWithPrefix(volumeID, bucketID, childId); + // Add to subPaths + subPaths.add(subPath); + // Recurse into this child directory + gatherSubPaths(childId, subPaths, names); + } + } + } + + + /** + * Converts a key prefix into an object path for FSO buckets, using IDs. + * <p> + * This method transforms a user-provided path (e.g., "volume/bucket/dir1") into + * a database-friendly format ("/volumeID/bucketID/ParentId/") by replacing names + * with their corresponding IDs. It simplifies database queries for FSO bucket operations. + * + * @param prevKeyPrefix The path to be converted, not including key or directory names/IDs. + * @return The object path as "/volumeID/bucketID/ParentId/" or an empty string if an error occurs. + * @throws IOException If database access fails. + */ + public String convertToObjectPath(String prevKeyPrefix) throws IOException, IllegalArgumentException { + try { + String[] names = parseRequestPath(normalizePath(prevKeyPrefix, BucketLayout.FILE_SYSTEM_OPTIMIZED)); + + // Root-Level :- Return the original path + if (names.length == 0) { + return prevKeyPrefix; + } + + // Volume-Level :- Fetch the volumeID + String volumeName = names[0]; + validateNames(volumeName); + String volumeKey = omMetadataManager.getVolumeKey(volumeName); + long volumeId = omMetadataManager.getVolumeTable().getSkipCache(volumeKey).getObjectID(); + if (names.length == 1) { + return constructObjectPathWithPrefix(volumeId); + } + + // Bucket-Level :- Fetch the bucketID + String bucketName = names[1]; + validateNames(bucketName); + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().getSkipCache(bucketKey); + long bucketId = bucketInfo.getObjectID(); + if (names.length == 2) { + return constructObjectPathWithPrefix(volumeId, bucketId); + } + + // Fetch the immediate parentID which could be a directory or the bucket itself + BucketHandler handler = getBucketHandler(reconNamespaceSummaryManager, omMetadataManager, reconSCM, bucketInfo); + long dirObjectId = handler.getDirInfo(names).getObjectID(); + return constructObjectPathWithPrefix(volumeId, bucketId, dirObjectId); + } catch (NullPointerException e) { Review Comment: We shouldn't catch NPE, try RuntimeException catch. ########## hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; + +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; +import org.junit.jupiter.api.BeforeEach; + +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenFileToOm; +import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenKeyToOm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.recon.ReconTestInjector; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import javax.ws.rs.core.Response; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; + +/** + * Test for OMDBInsightSearchEndpoint. + */ +public class TestOMDBInsightSearchEndpoint extends AbstractReconSqlDBTest { + + @TempDir + private Path temporaryFolder; + private ReconOMMetadataManager reconOMMetadataManager; + private OMDBInsightSearchEndpoint omdbInsightSearchEndpoint; + private OzoneConfiguration ozoneConfiguration; + private static final String ROOT_PATH = "/"; + private static final String TEST_USER = "TestUser"; + private OMMetadataManager omMetadataManager; + + private ReconNamespaceSummaryManager reconNamespaceSummaryManager; + + @BeforeEach + public void setUp() throws Exception { + ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, + 100); + omMetadataManager = initializeNewOmMetadataManager( + Files.createDirectory(temporaryFolder.resolve("JunitOmDBDir")) + .toFile()); + reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager, + Files.createDirectory(temporaryFolder.resolve("OmMetataDir")).toFile()); + + ReconTestInjector reconTestInjector = + new ReconTestInjector.Builder(temporaryFolder.toFile()) + .withReconSqlDb() + .withReconOm(reconOMMetadataManager) + .withOmServiceProvider(mock(OzoneManagerServiceProviderImpl.class)) + .addBinding(OzoneStorageContainerManager.class, + ReconStorageContainerManagerFacade.class) + .withContainerDB() + .addBinding(StorageContainerServiceProvider.class, + mock(StorageContainerServiceProviderImpl.class)) + .addBinding(OMDBInsightEndpoint.class) + .addBinding(ContainerHealthSchemaManager.class) + .build(); + reconNamespaceSummaryManager = + reconTestInjector.getInstance(ReconNamespaceSummaryManager.class); + omdbInsightSearchEndpoint = reconTestInjector.getInstance( + OMDBInsightSearchEndpoint.class); + + // populate OM DB and reprocess into Recon RocksDB + populateOMDB(); + NSSummaryTaskWithFSO nSSummaryTaskWithFso = + new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, + reconOMMetadataManager, ozoneConfiguration); + nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); + } + + /** + * Create a new OM Metadata manager instance with one user, one vol, and two + * buckets. + * + * @throws IOException ioEx + */ + private static OMMetadataManager initializeNewOmMetadataManager( + File omDbDir) + throws IOException { + OzoneConfiguration omConfiguration = new OzoneConfiguration(); + omConfiguration.set(OZONE_OM_DB_DIRS, + omDbDir.getAbsolutePath()); + OMMetadataManager omMetadataManager = new OmMetadataManagerImpl( + omConfiguration, null); + return omMetadataManager; + } + + @Test + public void testVolumeLevelSearchRestriction() throws IOException { + // Test with volume level path + String volumePath = "/vola"; + Response response = omdbInsightSearchEndpoint.searchOpenKeys(volumePath, 20, ""); + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + String entity = (String) response.getEntity(); + assertTrue(entity.contains("Invalid startPrefix: Path must be at the bucket level or deeper"), + "Expected a message indicating the path must be at the bucket level or deeper"); + + // Test with another volume level path + volumePath = "/volb"; + response = omdbInsightSearchEndpoint.searchOpenKeys(volumePath, 20, ""); + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + entity = (String) response.getEntity(); + assertTrue(entity.contains("Invalid startPrefix: Path must be at the bucket level or deeper"), + "Expected a message indicating the path must be at the bucket level or deeper"); + } + + @Test + public void testRootLevelSearchRestriction() throws IOException { + // Test with root level path + String rootPath = "/"; + Response response = omdbInsightSearchEndpoint.searchOpenKeys(rootPath, 20, ""); + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + String entity = (String) response.getEntity(); + assertTrue(entity.contains("Invalid startPrefix: Path must be at the bucket level or deeper"), + "Expected a message indicating the path must be at the bucket level or deeper"); + + // Test with root level path without trailing slash + rootPath = ""; + response = omdbInsightSearchEndpoint.searchOpenKeys(rootPath, 20, ""); + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + entity = (String) response.getEntity(); + assertTrue(entity.contains("Invalid startPrefix: Path must be at the bucket level or deeper"), + "Expected a message indicating the path must be at the bucket level or deeper"); + } + + @Test + public void testBucketLevelSearch() throws IOException { + Response response = + omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1", 20, ""); + assertEquals(200, response.getStatus()); + KeyInsightInfoResponse result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(5, result.getFsoKeyInfoList().size()); + assertEquals(0, result.getNonFSOKeyInfoList().size()); + // Assert Total Size + assertEquals(5000, result.getUnreplicatedDataSize()); + assertEquals(5000 * 3, result.getReplicatedDataSize()); + + response = + omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1", 20, ""); + assertEquals(200, response.getStatus()); + result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(5, result.getNonFSOKeyInfoList().size()); + assertEquals(0, result.getFsoKeyInfoList().size()); + // Assert Total Size + assertEquals(5000, result.getUnreplicatedDataSize()); + assertEquals(5000 * 3, result.getReplicatedDataSize()); + } + + @Test + public void testDirectoryLevelSearch() throws IOException { + Response response = + omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira1", 20, ""); + assertEquals(200, response.getStatus()); + KeyInsightInfoResponse result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(1, result.getFsoKeyInfoList().size()); + assertEquals(0, result.getNonFSOKeyInfoList().size()); + // Assert Total Size + assertEquals(1000, result.getUnreplicatedDataSize()); + assertEquals(1000 * 3, result.getReplicatedDataSize()); + + response = + omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira2", 20, ""); + assertEquals(200, response.getStatus()); + result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(1, result.getFsoKeyInfoList().size()); + assertEquals(0, result.getNonFSOKeyInfoList().size()); + // Assert Total Size + assertEquals(1000, result.getUnreplicatedDataSize()); + assertEquals(1000 * 3, result.getReplicatedDataSize()); + + response = + omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1/dira3", 20, ""); + assertEquals(200, response.getStatus()); + result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(1, result.getFsoKeyInfoList().size()); + assertEquals(0, result.getNonFSOKeyInfoList().size()); + // Assert Total Size + assertEquals(1000, result.getUnreplicatedDataSize()); + assertEquals(1000 * 3, result.getReplicatedDataSize()); + } + + @Test + public void testLimitSearch() throws IOException { + Response response = + omdbInsightSearchEndpoint.searchOpenKeys("/vola/bucketa1", 2, ""); + assertEquals(200, response.getStatus()); + KeyInsightInfoResponse result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(2, result.getFsoKeyInfoList().size()); + assertEquals(0, result.getNonFSOKeyInfoList().size()); + } + + @Test + public void testSearchOpenKeysWithNoMatchFound() throws IOException { + // Given a search prefix that matches no keys + String searchPrefix = "unknown-volume/unknown-bucket/"; + + Response response = + omdbInsightSearchEndpoint.searchOpenKeys(searchPrefix, 10, ""); + + // Then the response should indicate that no keys were found + assertEquals(Response.Status.NOT_FOUND.getStatusCode(), + response.getStatus(), "Expected a 404 NOT FOUND status"); + + String entity = (String) response.getEntity(); + assertTrue(entity.contains("No keys matched the search prefix"), + "Expected a message indicating no keys were found"); + } + + @Test + public void testSearchOpenKeysWithBadRequest() throws IOException { + // Give a negative limit + int negativeLimit = -1; + Response response = omdbInsightSearchEndpoint.searchOpenKeys("@323232", negativeLimit, ""); + + // Then the response should indicate that the request was bad + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), + response.getStatus(), "Expected a 400 BAD REQUEST status"); + + String entity = (String) response.getEntity(); + assertTrue(entity.contains("Invalid startPrefix: Path must be at the bucket level or deeper"), + "Expected a message indicating the path must be at the bucket level or deeper"); + } + + @Test + public void testLastKeyInResponse() throws IOException { + Response response = + omdbInsightSearchEndpoint.searchOpenKeys("/volb/bucketb1", 20, ""); + assertEquals(200, response.getStatus()); + KeyInsightInfoResponse result = + (KeyInsightInfoResponse) response.getEntity(); + assertEquals(0, result.getFsoKeyInfoList().size()); + assertEquals(5, result.getNonFSOKeyInfoList().size()); + // Assert Total Size + assertEquals(5000, result.getUnreplicatedDataSize()); + assertEquals(5000 * 3, result.getReplicatedDataSize()); + // Assert Last Key + assertEquals(ROOT_PATH + "volb/bucketb1/fileb5", result.getLastKey(), + "Expected last key to be 'fileb5'"); + } + + @Test + public void testSearchOpenKeysWithPagination() throws IOException { + // Set the initial parameters + String startPrefix = "/volb/bucketb1"; + int limit = 2; + String prevKey = ""; + + // Perform the first search request + Response response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, limit, prevKey); + assertEquals(200, response.getStatus()); + KeyInsightInfoResponse result = (KeyInsightInfoResponse) response.getEntity(); + assertEquals(2, result.getNonFSOKeyInfoList().size()); + assertEquals(0, result.getFsoKeyInfoList().size()); + + // Extract the last key from the response + prevKey = result.getLastKey(); + assertNotNull(prevKey, "Last key should not be null"); + + // Perform the second search request using the last key + response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, limit, prevKey); + assertEquals(200, response.getStatus()); + result = (KeyInsightInfoResponse) response.getEntity(); + assertEquals(2, result.getNonFSOKeyInfoList().size()); + assertEquals(0, result.getFsoKeyInfoList().size()); + + // Extract the last key from the response + prevKey = result.getLastKey(); + assertNotNull(prevKey, "Last key should not be null"); + + // Perform the third search request using the last key + response = omdbInsightSearchEndpoint.searchOpenKeys(startPrefix, limit, prevKey); + assertEquals(200, response.getStatus()); + result = (KeyInsightInfoResponse) response.getEntity(); + assertEquals(1, result.getNonFSOKeyInfoList().size()); + assertEquals(0, result.getFsoKeyInfoList().size()); Review Comment: Add assertion for empty lastKey here on last page. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler; +import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo; +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_START_PREFIX; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.noMatchedKeysResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createBadRequestResponse; +import static org.apache.hadoop.ozone.recon.ReconResponseUtils.createInternalServerErrorResponse; +import static org.apache.hadoop.ozone.recon.ReconUtils.constructObjectPathWithPrefix; +import static org.apache.hadoop.ozone.recon.ReconUtils.validateNames; +import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath; + +/** + * REST endpoint for search implementation in OM DB Insight. + */ +@Path("/keys") +@Produces(MediaType.APPLICATION_JSON) +@AdminOnly +public class OMDBInsightSearchEndpoint { + + private OzoneStorageContainerManager reconSCM; + private final ReconOMMetadataManager omMetadataManager; + private static final Logger LOG = + LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class); + private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager; + + + @Inject + public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM, + ReconOMMetadataManager omMetadataManager, + ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager) { + this.reconSCM = reconSCM; + this.omMetadataManager = omMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + } + + + /** + * Performs a search for open keys in the Ozone Manager (OM) database using a specified search prefix. + * This endpoint searches across both File System Optimized (FSO) and Object Store (non-FSO) layouts, + * compiling a list of keys that match the given prefix along with their data sizes. + * <p> + * The search prefix must start from the bucket level ('/volumeName/bucketName/') or any specific directory + * or key level (e.g., '/volA/bucketA/dir1' for everything under 'dir1' inside 'bucketA' of 'volA'). + * The search operation matches the prefix against the start of keys' names within the OM DB. + * <p> + * Example Usage: + * 1. A startPrefix of "/volA/bucketA/" retrieves every key under bucket 'bucketA' in volume 'volA'. + * 2. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' inside 'bucketA' of 'volA'. + * + * @param startPrefix The prefix for searching keys, starting from the bucket level or any specific path. + * @param limit Limits the number of returned keys. + * @param prevKey The key to start after for the next set of records. + * @return A KeyInsightInfoResponse, containing matching keys and their data sizes. + * @throws IOException On failure to access the OM database or process the operation. + */ + @GET + @Path("/open/search") + public Response searchOpenKeys( + @DefaultValue(DEFAULT_START_PREFIX) @QueryParam("startPrefix") + String startPrefix, + @DefaultValue(RECON_OPEN_KEY_DEFAULT_SEARCH_LIMIT) @QueryParam("limit") + int limit, + @DefaultValue(RECON_OPEN_KEY_SEARCH_DEFAULT_PREV_KEY) @QueryParam("prevKey") String prevKey) throws IOException { + + try { + // Ensure startPrefix is not null or empty and starts with '/' + if (startPrefix == null || startPrefix.length() == 0) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix; + + // Split the path to ensure it's at least at the bucket level + String[] pathComponents = startPrefix.split("/"); + if (pathComponents.length < 3 || pathComponents[2].isEmpty()) { + return createBadRequestResponse( + "Invalid startPrefix: Path must be at the bucket level or deeper."); + } + + // Ensure the limit is non-negative + limit = Math.max(0, limit); + + // Initialize response object + KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse(); + long replicatedTotal = 0; + long unreplicatedTotal = 0; + boolean keysFound = false; // Flag to track if any keys are found + String lastKey = null; + + // Search for non-fso keys in KeyTable + Table<String, OmKeyInfo> openKeyTable = + omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY); + Map<String, OmKeyInfo> obsKeys = + retrieveKeysFromTable(openKeyTable, startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getNonFSOKeyInfoList() + .add(keyEntityInfo); // Add to non-FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // Search for fso keys in FileTable + Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(startPrefix, limit, prevKey); + for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getFsoKeyInfoList() + .add(keyEntityInfo); // Add to FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + lastKey = entry.getKey(); // Update lastKey + } + + // If no keys were found, return a response indicating that no keys matched + if (!keysFound) { + return noMatchedKeysResponse(startPrefix); + } + + // Set the aggregated totals in the response + insightResponse.setReplicatedDataSize(replicatedTotal); + insightResponse.setUnreplicatedDataSize(unreplicatedTotal); + insightResponse.setLastKey(lastKey); + + // Return the response with the matched keys and their data sizes + return Response.ok(insightResponse).build(); + } catch (IOException e) { + // Handle IO exceptions and return an internal server error response + return createInternalServerErrorResponse( + "Error searching open keys in OM DB: " + e.getMessage()); + } catch (IllegalArgumentException e) { Review Comment: In what use case, it will throw IllegalArgumentException, pls check, I have a doubt, and even if this method throws in some scenario, then pls write javadoc comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
