ArafatKhan2198 commented on code in PR #6231: URL: https://github.com/apache/ozone/pull/6231#discussion_r1527471297
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightSearchEndpoint.java: ########## @@ -0,0 +1,457 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import org.antlr.v4.runtime.misc.Pair; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.om.helpers.*; +import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler; +import org.apache.hadoop.ozone.recon.api.types.KeyEntityInfo; +import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.scm.ReconContainerManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_FSO; +import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_NON_FSO; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_FSO; +import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_NON_FSO; +import static org.apache.hadoop.ozone.recon.api.handlers.BucketHandler.getBucketHandler; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.normalizePath; +import static org.apache.hadoop.ozone.recon.api.handlers.EntityHandler.parseRequestPath; + +/** + * REST endpoint for search implementation in OM DB Insight. + */ +@Path("/insights") +@Produces(MediaType.APPLICATION_JSON) +@AdminOnly +public class OMDBInsightSearchEndpoint { + + private OzoneStorageContainerManager reconSCM; + private final ReconOMMetadataManager omMetadataManager; + private static final Logger LOG = + LoggerFactory.getLogger(OMDBInsightSearchEndpoint.class); + private ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager; + + + @Inject + public OMDBInsightSearchEndpoint(OzoneStorageContainerManager reconSCM, + ReconOMMetadataManager omMetadataManager, + ReconNamespaceSummaryManagerImpl reconNamespaceSummaryManager) { + this.reconSCM = reconSCM; + this.omMetadataManager = omMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + } + + + /** + * Performs a search for open keys in the Ozone Manager (OM) database using a specified search prefix. + * This endpoint can search across both File System Optimized (FSO) and Object Store (non-FSO) layouts, + * compiling a list of keys that match the given prefix along with their data sizes. + * + * The search prefix may range from the root level ('/') to any specific directory + * or key level (e.g., '/volA/' for everything under 'volA'). The search operation matches + * the prefix against the start of keys' names within the OM DB. + * + * Example Usage: + * 1. A searchPrefix of "/" will return all keys in the database. + * 2. A searchPrefix of "/volA/" retrieves every key under volume 'volA'. + * 3. Specifying "/volA/bucketA/dir1" focuses the search within 'dir1' inside 'bucketA' of 'volA'. + * + * @param searchPrefix The prefix for searching keys, starting from the root ('/') or any specific path. + * @param includeFso Indicates whether to include FSO layout keys in the search. + * @param includeNonFso Indicates whether to include non-FSO layout keys in the search. + * @param limit Limits the number of returned keys. + * @return A KeyInsightInfoResponse, containing matching keys and their data sizes. + * @throws IOException On failure to access the OM database or process the operation. + */ + @GET + @Path("/openKeys/search") + public Response searchOpenKeys( + @QueryParam("searchPrefix") + String searchPrefix, + @DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_FSO) @QueryParam(RECON_OPEN_KEY_INCLUDE_FSO) + boolean includeFso, + @DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_NON_FSO) @QueryParam(RECON_OPEN_KEY_INCLUDE_NON_FSO) + boolean includeNonFso, + @DefaultValue("10") @QueryParam("limit") + int limit) throws IOException { + if (searchPrefix == null || searchPrefix.trim().isEmpty()) { + return createBadRequestResponse( + "The searchPrefix query parameter is required."); + } + + KeyInsightInfoResponse insightResponse = new KeyInsightInfoResponse(); + long replicatedTotal = 0; + long unreplicatedTotal = 0; + boolean keysFound = false; // Flag to track if any keys are found + + // Fetch keys from OBS layout and convert them into KeyEntityInfo objects + Map<String, OmKeyInfo> obsKeys = new LinkedHashMap<>(); + if (includeNonFso) { + obsKeys = searchOpenKeysInOBS(searchPrefix, limit); + for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getNonFSOKeyInfoList() + .add(keyEntityInfo); // Add to non-FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + } + } + + // Fetch keys from FSO layout, if the limit is not yet reached + if (includeFso) { + Map<String, OmKeyInfo> fsoKeys = searchOpenKeysInFSO(searchPrefix, limit); + for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) { + keysFound = true; + KeyEntityInfo keyEntityInfo = + createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue()); + insightResponse.getFsoKeyInfoList() + .add(keyEntityInfo); // Add to FSO list + replicatedTotal += entry.getValue().getReplicatedSize(); + unreplicatedTotal += entry.getValue().getDataSize(); + } + } + + // If no keys were found, return a response indicating that no keys matched + if (!keysFound) { + return noMatchedKeysResponse(searchPrefix); + } + + // Set the aggregated totals in the response + insightResponse.setReplicatedDataSize(replicatedTotal); + insightResponse.setUnreplicatedDataSize(unreplicatedTotal); + + return Response.ok(insightResponse).build(); + } + + /** + * Creates a KeyEntityInfo object from an OmKeyInfo object and the corresponding key. + * + * @param dbKey The key in the database corresponding to the OmKeyInfo object. + * @param keyInfo The OmKeyInfo object to create the KeyEntityInfo from. + * @return The KeyEntityInfo object created from the OmKeyInfo object and the key. + */ + private KeyEntityInfo createKeyEntityInfoFromOmKeyInfo(String dbKey, + OmKeyInfo keyInfo) { + KeyEntityInfo keyEntityInfo = new KeyEntityInfo(); + keyEntityInfo.setKey(dbKey); // Set the DB key + keyEntityInfo.setPath( + keyInfo.getKeyName()); // Assuming path is the same as key name + keyEntityInfo.setInStateSince(keyInfo.getCreationTime()); + keyEntityInfo.setSize(keyInfo.getDataSize()); + keyEntityInfo.setReplicatedSize(keyInfo.getReplicatedSize()); + keyEntityInfo.setReplicationConfig(keyInfo.getReplicationConfig()); + return keyEntityInfo; + } + + + public Map<String, OmKeyInfo> searchOpenKeysInOBS(String searchPrefix, + int limit) + throws IOException { + + Map<String, OmKeyInfo> matchedKeys = + new LinkedHashMap<>(); // Preserves the insertion order + Table<String, OmKeyInfo> openKeyTable = + omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY); + + try ( + TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> + keyIter = openKeyTable.iterator()) { + keyIter.seek(searchPrefix); + while (keyIter.hasNext() && matchedKeys.size() < limit) { + Table.KeyValue<String, OmKeyInfo> entry = keyIter.next(); + String dbKey = entry.getKey(); // Get the DB key + if (!dbKey.startsWith(searchPrefix)) { + break; // Exit the loop if the key no longer matches the prefix + } + // Add the DB key and OmKeyInfo object to the map + matchedKeys.put(dbKey, entry.getValue()); + } + } catch (NullPointerException | IOException exception) { + createInternalServerErrorResponse( + "Error retrieving keys from openKeyTable for path: " + searchPrefix); + } + + return matchedKeys; + } + + + public Map<String, OmKeyInfo> searchOpenKeysInFSO(String searchPrefix, + int limit) + throws IOException { + Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>(); + // Convert the search prefix to an object path for FSO buckets + String searchPrefixObjectPath = convertToObjectPath(searchPrefix); + String[] names = parseRequestPath(searchPrefixObjectPath); + + // If names.length > 2, then the search prefix is at the volume or bucket level hence + // no need to find parent or extract id's or find subpaths as the openFileTable is + // suitable for volume and bucket level search + if (names.length > 2) { + // Fetch the parent ID to search for + long parentId = Long.parseLong(names[names.length - 1]); + + // Fetch the nameSpaceSummary for the parent ID + NSSummary parentSummary = + reconNamespaceSummaryManager.getNSSummary(parentId); + if (parentSummary == null) { + return matchedKeys; + } + List<String> subPaths = new ArrayList<>(); + // Add the initial search prefix object path because it can have both openFiles + // and sub-directories with openFiles + subPaths.add(searchPrefixObjectPath); + + // Recursively gather all subpaths + gatherSubPaths(parentId, subPaths, names); + + // Iterate over the subpaths and retrieve the open files + for (String subPath : subPaths) { + matchedKeys.putAll( + retrieveKeysFromOpenFileTable(subPath, limit - matchedKeys.size())); + if (matchedKeys.size() >= limit) { + break; + } + } + return matchedKeys; + } + + // Iterate over for bucket and volume level search + matchedKeys.putAll(retrieveKeysFromOpenFileTable(searchPrefixObjectPath, + limit - matchedKeys.size())); + return matchedKeys; + } + + + private Map<String, OmKeyInfo> retrieveKeysFromOpenFileTable(String subPath, + int limit) + throws IOException { + + Map<String, OmKeyInfo> matchedKeys = + new LinkedHashMap<>(); // Preserves the insertion order + Table<String, OmKeyInfo> openFileTable = + omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED); + + try ( + TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyIter = + openFileTable.iterator()) { + keyIter.seek(subPath); + while (keyIter.hasNext() && matchedKeys.size() < limit) { + Table.KeyValue<String, OmKeyInfo> entry = keyIter.next(); + String dbKey = entry.getKey(); // Get the DB key + if (!dbKey.startsWith(subPath)) { + break; // Exit the loop if the key no longer matches the prefix + } + // Add the DB key and OmKeyInfo object to the map + matchedKeys.put(dbKey, entry.getValue()); + } + } catch (NullPointerException | IOException exception) { Review Comment: Thanks we will no longer be catching the NPE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
