cshuo commented on code in PR #18348: URL: https://github.com/apache/hudi/pull/18348#discussion_r3043978561
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled + if (dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled()) { + validateRecordIndex(records, fileGroupCount, metadataMetaClient); + } + records.unpersist(); + } + + /** + * Validates the record index after bootstrap by comparing the expected record count with the actual + * record count stored in the metadata table. The validation is performed in a distributed manner + * using the engine context to count records from HFiles in parallel. + * + * @param recordIndexRecords the HoodieData containing the expected records + * @param fileGroupCount the expected number of file groups + * @param metadataMetaClient meta client for the metadata table + */ + protected void validateRecordIndex(HoodieData<HoodieRecord> recordIndexRecords, int fileGroupCount, HoodieTableMetaClient metadataMetaClient) { + String partitionName = MetadataPartitionType.RECORD_INDEX.getPartitionPath(); + HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient); + try { + // Use merged file slices to handle cases with pending compactions + List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, fsView, partitionName); + + // Filter to only file slices with base files and extract their storage paths + List<StoragePath> baseFilePaths = fileSlices.stream() + .filter(fs -> fs.getBaseFile().isPresent()) + .map(fs -> fs.getBaseFile().get().getStoragePath()) + .collect(Collectors.toList()); + + // Count records in a distributed manner using the engine context + long totalRecords = countRecordsInHFiles(baseFilePaths, metadataMetaClient); + long expectedRecordCount = recordIndexRecords.count(); + + ValidationUtils.checkArgument(totalRecords == expectedRecordCount, "Record Count Validation failed with " + + totalRecords + " present in record index vs the expected " + expectedRecordCount); + log.info(String.format("Record index initialized on %d shards (expected = %d) with %d records (expected = %d)", + fileSlices.size(), fileGroupCount, totalRecords, expectedRecordCount)); + } finally { + fsView.close(); + } + } + + /** + * Counts the total number of records in HFiles in a distributed manner. + * + * @param baseFilePaths list of storage paths to HFiles + * @param metadataMetaClient meta client for the metadata table + * @return total number of records across all HFiles + */ + private long countRecordsInHFiles(List<StoragePath> baseFilePaths, HoodieTableMetaClient metadataMetaClient) { + if (baseFilePaths.isEmpty()) { + return 0L; + } + + int parallelism = Math.min(baseFilePaths.size(), dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + StorageConfiguration<?> storageConfBroadcast = metadataMetaClient.getStorageConf(); + HoodieFileFormat baseFileFormat = metadataMetaClient.getTableConfig().getBaseFileFormat(); + + return engineContext.parallelize(baseFilePaths, parallelism) + .mapPartitions(pathIterator -> { + long count = 0L; + while (pathIterator.hasNext()) { + StoragePath path = pathIterator.next(); + try { + HoodieStorage storage = HoodieStorageUtils.getStorage(path, storageConfBroadcast); + HoodieConfig readerConfig = new HoodieConfig(); + HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage) + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(readerConfig, path, baseFileFormat, Option.empty()); + try { + count += reader.getTotalRecords(); + } finally { + reader.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Error reading total records from file " + path, e); + } + } + return Collections.singletonList(count).iterator(); + }, true) + .collectAsList() + .stream() + .mapToLong(Long::longValue) + .sum(); + } + + /** + * Fetch record locations from FileSlice snapshot. + * + * @param engineContext context ot use. + * @param partitionFileSlicePairs list of pairs of partition and file slice. + * @param recordIndexMaxParallelism parallelism to use. + * @param activeModule active module of interest. + * @param metaClient metaclient instance to use. + * @param dataWriteConfig write config to use. + * @return metadata records for initializing record index entries. + */ + protected <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot( + HoodieEngineContext engineContext, + List<FileSliceAndPartition> partitionFileSlicePairs, + int recordIndexMaxParallelism, + String activeModule, + HoodieTableMetaClient metaClient, + HoodieWriteConfig dataWriteConfig) { + if (partitionFileSlicePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + Option<String> instantTime = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::requestedTime); + if (!instantTime.isPresent()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices"); + final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); + ReaderContextFactory<T> readerContextFactory = engineContext.getReaderContextFactory(metaClient); + return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndFileSlice -> { + final String partition = partitionAndFileSlice.partitionPath(); + final FileSlice fileSlice = partitionAndFileSlice.fileSlice(); + final String fileId = fileSlice.getFileId(); + HoodieReaderContext<T> readerContext = readerContextFactory.getContext(); + HoodieSchema dataSchema = HoodieSchemaCache.intern(HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField())); + HoodieSchema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema() Review Comment: There is a callback to close the underlying closeable iterator of HoodieData, like `CloseableIteratorListener`, so the file group reader will be closed after the closeable iterator finishes iterating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
