devmadhuu commented on code in PR #7796: URL: https://github.com/apache/ozone/pull/7796#discussion_r1959035335
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.jooq.DSLContext; + +/** + * Task for ObjectStore (OBS) which processes the KEY_TABLE. + */ +public class FileSizeCountTaskOBS implements ReconOmTask { + private static final org.slf4j.Logger LOG = Review Comment: This seems unused. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.jooq.DSLContext; + +/** + * Task for FileSystemOptimized (FSO) which processes the FILE_TABLE. + */ +public class FileSizeCountTaskFSO implements ReconOmTask { + private static final org.slf4j.Logger LOG = Review Comment: This seems unused. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.recon.ReconConstants; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize; +import org.jooq.DSLContext; +import org.jooq.Record3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class that encapsulates the common code for file size count tasks. + */ +public abstract class FileSizeCountTaskHelper { + protected static final Logger LOG = LoggerFactory.getLogger(FileSizeCountTaskHelper.class); + + /** + * Truncates the FILE_COUNT_BY_SIZE table if it has not been truncated yet. + * + * @param dslContext DSLContext for executing DB commands. + */ + public static void truncateTableIfNeeded(DSLContext dslContext) { + if (ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.compareAndSet(false, true)) { + int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute(); + LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE); + } else { + LOG.info("Table already truncated by another task; skipping deletion."); + } + } + + /** + * Executes the reprocess method for the given task. + * + * @param omMetadataManager OM metadata manager. + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param bucketLayout The bucket layout to process. + * @param taskName The name of the task for logging. + * @return A Pair of task name and boolean indicating success. + */ + public static Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + BucketLayout bucketLayout, + String taskName) { + LOG.info("Starting Reprocess for {}", taskName); + Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>(); + long startTime = System.currentTimeMillis(); + truncateTableIfNeeded(dslContext); + boolean status = reprocessBucketLayout( + bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, fileCountBySizeDao, taskName); + if (!status) { + return new ImmutablePair<>(taskName, false); + } + writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); + long endTime = System.currentTimeMillis(); + LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - startTime)); + return new ImmutablePair<>(taskName, true); + } + + /** + * Iterates over the OM DB keys for the given bucket layout and updates the fileSizeCountMap. + * + * @param bucketLayout The bucket layout to use. + * @param omMetadataManager OM metadata manager. + * @param fileSizeCountMap Map accumulating file size counts. + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param taskName The name of the task for logging. + * @return true if processing succeeds, false otherwise. + */ + public static boolean reprocessBucketLayout(BucketLayout bucketLayout, + OMMetadataManager omMetadataManager, + Map<FileSizeCountKey, Long> fileSizeCountMap, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + String taskName) { + Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); + int totalKeysProcessed = 0; + try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyIter = + omKeyInfoTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue<String, OmKeyInfo> kv = keyIter.next(); + handlePutKeyEvent(kv.getValue(), fileSizeCountMap); + totalKeysProcessed++; + + // Flush to DB periodically. + if (fileSizeCountMap.size() >= 100000) { + writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); + fileSizeCountMap.clear(); + } + } + } catch (IOException ioEx) { + LOG.error("Unable to populate File Size Count for {} in Recon DB.", taskName, ioEx); + return false; + } + LOG.info("Reprocessed {} keys for bucket layout {}.", totalKeysProcessed, bucketLayout); + return true; + } + + /** + * Processes a batch of OM update events. + * + * @param events OM update event batch. + * @param bucketLayout The bucket layout for which either keyTable or fileTable is fetched + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param taskName The name of the task for logging. + * @return A Pair of task name and boolean indicating success. + */ + public static Pair<String, Boolean> processEvents(OMUpdateEventBatch events, + String bucketLayout, Review Comment: ```suggestion String tableName, ``` As this seems tableName not bucket layout, so argument name is confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
