yihua commented on code in PR #18348: URL: https://github.com/apache/hudi/pull/18348#discussion_r3048419158
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private final Lazy<List<String>> columnsToIndex; + + public ColumnStatsIndexer(HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.columnsToIndex = Lazy.lazily(() -> + new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex( + dataTableMetaClient.getTableConfig(), + dataTableWriteConfig.getMetadataConfig(), + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)), + true, + Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()), + existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)).keySet())); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + final int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } + + log.info("Indexing {} columns for column stats index", columnsToIndex.get().size()); + // during initialization, we need stats for base and log files. + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToAllFilesMap, + dataTableMetaClient, dataTableWriteConfig.getColumnStatsIndexParallelism(), + dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), + columnsToIndex.get()); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), records)); + } Review Comment: 🤖 When `columnsToIndex` is empty and `buildInitialization` returns empty data, `postInitialization` will still be called and will register an index definition with empty source fields. Is that intentional? It seems like registering a column stats index with no source fields could confuse downstream readers. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,107 +412,78 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - // For a fresh table, defer RLI initialization - if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() && this.enabledPartitionTypes.contains(RECORD_INDEX) - && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { - this.enabledPartitionTypes.remove(RECORD_INDEX); - partitionsToInit.remove(RECORD_INDEX); + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + hasPartitionsStateChanged = true; } - Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - for (MetadataPartitionType partitionType : partitionsToInit) { - // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. - String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); - String partitionTypeName = partitionType.name(); - LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition); - String relativePartitionPath; - Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; - Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)); - try { - switch (partitionType) { - case FILES: - fileGroupCountAndRecordsPair = initializeFilesPartition(partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case BLOOM_FILTERS: - fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case COLUMN_STATS: - Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema); - fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue(); - initializeFilegroupsAndCommit(partitionType, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, colStatsColumnsAndRecord.getKey()); - break; - case RECORD_INDEX: - boolean isPartitionedRLI = dataWriteConfig.isRecordLevelIndexEnabled(); - initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI); - break; - case EXPRESSION_INDEX: - Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (expressionIndexPartitionsToInit.size() != 1) { - if (expressionIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = expressionIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case PARTITION_STATS: - // For PARTITION_STATS, COLUMN_STATS should also be enabled - if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) { - LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); - continue; - } - fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case SECONDARY_INDEX: - Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (secondaryIndexPartitionsToInit.size() != 1) { - if (secondaryIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = secondaryIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - default: - throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); - } - } catch (Exception e) { - String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; - metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - String errMsg = String.format("Bootstrap on %s partition failed for %s", - partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); - LOG.error(errMsg, e); - throw new HoodieMetadataException(errMsg, e); + indexerMapForPartitionsToInit.entrySet().stream().filter(e -> e.getKey() != FILES).forEach( + e -> { + try { + initializeMetadataPartition(e.getKey(), e.getValue(), dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + } catch (IOException ex) { + throw new HoodieMetadataException("Failed to initialize metadata partition: " + e.getKey(), ex); + } + hasPartitionsStateChanged = true; + }); + return true; + } + + private void initializeMetadataPartition( + MetadataPartitionType partitionType, + Indexer indexer, Review Comment: 🤖 `hasPartitionsStateChanged = true` is set after `initializeMetadataPartition` returns, even when `buildInitialization` returns an empty list and the method returns early without actually initializing anything. In the old code, `continue` in the for-loop would skip setting this flag. Could you move `hasPartitionsStateChanged = true` inside `initializeMetadataPartition` so it's only set when initialization actually happened? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.secondary; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; + +/** + * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index + */ +@Slf4j +public class SecondaryIndexer extends BaseIndexer { + + public SecondaryIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (secondaryIndexPartitionsToInit.size() != 1) { + if (secondaryIndexPartitionsToInit.size() > 1) { + log.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = secondaryIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + + int parallelism = Math.min(partitionFileSlicePairs.size(), dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism()); + HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices( + engineContext, + partitionFileSlicePairs, + parallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + indexDefinition, + dataTableWriteConfig.getProps()); Review Comment: 🤖 This passes `RECORD_INDEX` as the partition type to `estimateFileGroupCount`, but this is for a secondary index partition. The partition type is used in logging, so this will produce misleading log messages (e.g. "Estimated file group count for RECORD_INDEX: ..."). Should this be the actual secondary index partition type instead? ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -439,22 +439,22 @@ public static List<MetadataPartitionType> getMetadataPartitionsNeedingWriteStatu return Collections.singletonList(MetadataPartitionType.RECORD_INDEX); } - /** - * Returns the set of all metadata partition names. - */ - public static Set<String> getAllPartitionPaths() { - return Arrays.stream(getValidValues()) - .map(MetadataPartitionType::getPartitionPath) - .collect(Collectors.toSet()); - } - /** * Returns the set of all valid metadata partition types. Prefer using this method over {@link #values()}. */ - public static MetadataPartitionType[] getValidValues() { - // ALL_PARTITIONS is just another record type in FILES partition + public static MetadataPartitionType[] getValidValues(HoodieTableVersion tableVersion) { + if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { + // ALL_PARTITIONS is just another record type in FILES partition + return EnumSet.complementOf(EnumSet.of( Review Comment: 🤖 For table versions < 8, this now filters out `SECONDARY_INDEX`, `EXPRESSION_INDEX`, and `PARTITION_STATS`. Previously `getValidValues()` returned all types regardless of version. Is this intentional — were these partition types never actually functional on pre-v8 tables? I want to make sure this doesn't accidentally break enabling these indexes on older tables that haven't upgraded. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,107 +412,78 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - // For a fresh table, defer RLI initialization - if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() && this.enabledPartitionTypes.contains(RECORD_INDEX) - && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { - this.enabledPartitionTypes.remove(RECORD_INDEX); - partitionsToInit.remove(RECORD_INDEX); + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + hasPartitionsStateChanged = true; } - Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - for (MetadataPartitionType partitionType : partitionsToInit) { - // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. - String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); - String partitionTypeName = partitionType.name(); - LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition); - String relativePartitionPath; - Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; - Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)); - try { - switch (partitionType) { - case FILES: - fileGroupCountAndRecordsPair = initializeFilesPartition(partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case BLOOM_FILTERS: - fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case COLUMN_STATS: - Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema); - fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue(); - initializeFilegroupsAndCommit(partitionType, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, colStatsColumnsAndRecord.getKey()); - break; - case RECORD_INDEX: - boolean isPartitionedRLI = dataWriteConfig.isRecordLevelIndexEnabled(); - initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI); - break; - case EXPRESSION_INDEX: - Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (expressionIndexPartitionsToInit.size() != 1) { - if (expressionIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = expressionIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case PARTITION_STATS: - // For PARTITION_STATS, COLUMN_STATS should also be enabled - if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) { - LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); - continue; - } - fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case SECONDARY_INDEX: - Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (secondaryIndexPartitionsToInit.size() != 1) { - if (secondaryIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = secondaryIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - default: - throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); - } - } catch (Exception e) { - String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; - metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - String errMsg = String.format("Bootstrap on %s partition failed for %s", - partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); - LOG.error(errMsg, e); - throw new HoodieMetadataException(errMsg, e); + indexerMapForPartitionsToInit.entrySet().stream().filter(e -> e.getKey() != FILES).forEach( + e -> { + try { + initializeMetadataPartition(e.getKey(), e.getValue(), dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + } catch (IOException ex) { + throw new HoodieMetadataException("Failed to initialize metadata partition: " + e.getKey(), ex); + } + hasPartitionsStateChanged = true; Review Comment: 🤖 The old code used a `List` for `partitionsToInit`, which preserved ordering (FILES always first, then the rest in enum order). The new code uses `HashMap` with FILES handled separately, but remaining partitions iterate in non-deterministic order. Have you verified there are no implicit ordering dependencies between non-FILES partition initializations? For example, does PARTITION_STATS initialization ever depend on COLUMN_STATS being initialized first? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (partitionFileSlicePairs.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + + int parallelism = Math.min(filesToIndex.size(), dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); Review Comment: 🤖 If `partitionFileSlicePairs` is non-empty but every file slice happens to have no base file and no log files, `filesToIndex` would be empty and `parallelism` becomes 0. A zero parallelism value can cause issues downstream (e.g., Spark's `parallelize` with 0 partitions). Could you add an emptiness check on `filesToIndex` similar to the one on `partitionFileSlicePairs`? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE; + +/** + * Base implementation of {@link MetadataPartitionType#RECORD_INDEX} index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled Review Comment: 🤖 The `postInitialization` calls `records.unpersist()` on the union records, but each per-partition HoodieData was individually persisted inside `estimateFileGroupCount` (via `records.persist("MEMORY_AND_DISK_SER")`). For the `PartitionedRecordIndexer` case, are these per-partition persisted datasets also being unpersisted somewhere, or could they leak? ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Accessors; + +/** + * Data class representing a file and partition with a flag. + */ +@Getter +@AllArgsConstructor +@Accessors(fluent = true) Review Comment: 🤖 `FileAndPartitionFlag` is passed to `engineContext.parallelize()` in `fetchPartitionFileInfo` (used by both bloom filter and column stats record conversion), which requires Spark serialization. The old `Tuple3` it replaces implements `Serializable`, but this class does not. This will cause a runtime `NotSerializableException` when running on Spark. Could you add `implements Serializable`? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.record.HoodieRecordIndex; +import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; + +/** + * Implementation of the partitioned {@link MetadataPartitionType#RECORD_INDEX} index + */ +@Slf4j +public class PartitionedRecordIndexer extends BaseRecordIndexer { + public PartitionedRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + createRecordIndexDefinition(dataTableMetaClient, Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true")); + Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = lazyPartitionFileSlices.get().stream() + .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath)); + Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new HashMap<>(partitionFileSlicePairsMap.size()); + int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty() + ? 1 : Math.max(1, dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / partitionFileSlicePairsMap.size()); + int totalFileGroupCount = 0; + for (Map.Entry<String, List<FileSliceAndPartition>> entry: partitionFileSlicePairsMap.entrySet()) { + String partition = entry.getKey(); + List<FileSliceAndPartition> fileSliceAndPartitions = entry.getValue(); + log.info("Initializing partitioned record index from data partition {}", partition); + DataPartitionAndRecords dataPartitionAndRecords = initializeRecordIndexPartition(partition, fileSliceAndPartitions, maxParallelismPerHudiPartition); + fileGroupCountAndRecordsPairMap.put(partition, dataPartitionAndRecords); + totalFileGroupCount += dataPartitionAndRecords.numFileGroups(); Review Comment: 🤖 When `partitionFileSlicePairsMap` is empty (e.g., empty table), `totalFileGroupCount` will be 0 and `initializationList` will be empty, but the code still returns an `IndexPartitionInitialization` with 0 file groups. Could this cause issues downstream? The non-partitioned `RecordIndexer` path goes through `estimateFileGroupCount` which guarantees at least 1 file group via `Math.max(1, ...)`. Is an empty-table scenario expected to be gated earlier in the call chain? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled Review Comment: 🤖 Worth noting that since `records` passed to `postInitialization` is the *union* of all per-partition records (built in `initializeMetadataPartition`), calling `records.unpersist()` on a Spark union RDD does not automatically unpersist the parent RDDs that were individually persisted inside `estimateFileGroupCount`. For non-partitioned RLI this is fine (single source), but for partitioned RLI, the individual per-partition records that were persisted may leak. The old code had the same pattern though, so it's not a regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
