hudi-agent commented on code in PR #18372: URL: https://github.com/apache/hudi/pull/18372#discussion_r3295885668
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer; +import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (filesToIndex.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + + int parallelism = Math.min(filesToIndex.size(), dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + HoodieSchema tableSchema = + HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient) + .orElseThrow(() -> new HoodieMetadataException("Table schema is not available for expression index initialization")); + HoodieSchema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, tableSchema); + + HoodieData<HoodieRecord> records = expressionIndexRecordGenerator.buildInitialization( + filesToIndex, indexDefinition, dataTableMetaClient, parallelism, + tableSchema, readerSchema, engineContext.getStorageConf(), dataTableInstantTime); + + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate(String instantTime, HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { Review Comment: 🤖 This throws when `dataTableMetaClient.getIndexMetadata()` is empty, but `ExpressionIndexer` is added to `enabledIndexerMap` whenever `metadataConfig.isExpressionIndexEnabled()` is true — even before any expression index has been bootstrapped. The caller comment in `update(HoodieCleanMetadata)` says "for index partitions not needed to handle, `buildClean` will return an empty list", which `buildUpdate` honors but this path violates. Could you mirror `buildUpdate`'s `isMetadataPartitionAvailable` check and return `Collections.emptyList()` instead, so cleans don't fail when the partition isn't bootstrapped yet? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -1592,102 +1099,15 @@ public BatchMetadataConversionFunction(String instantTime, HoodieCommitMetadata } @Override - public Map<String, HoodieData<HoodieRecord>> convertMetadata() { - Map<String, HoodieData<HoodieRecord>> partitionToRecordMap = - HoodieMetadataWriteUtils.convertMetadataToRecords( - engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, getTableMetadata(), - dataWriteConfig.getMetadataConfig(), - partitionsToUpdate, dataWriteConfig.getBloomFilterType(), - dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType(), - Option.of(dataWriteConfig.getRecordMerger().getRecordType())); - - // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code - // to the HoodieTableMetadataUtil class in hudi-common. - if (partitionsToUpdate.contains(RECORD_INDEX.getPartitionPath())) { - HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()), commitMetadata); - partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates)); - } - if (partitionsToUpdate.stream().anyMatch(partition -> partition.startsWith(EXPRESSION_INDEX.getPartitionPath()))) { - updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); - } - if (partitionsToUpdate.stream().anyMatch(partition -> partition.startsWith(SECONDARY_INDEX.getPartitionPath()))) { - updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime); - } - return partitionToRecordMap; - } - } - - /** - * Update expression index from {@link HoodieCommitMetadata}. - */ - private void updateExpressionIndexIfPresent(HoodieCommitMetadata commitMetadata, String instantTime, - Map<String, HoodieData<HoodieRecord>> partitionToRecordMap) { - if (!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataMetaClient)) { - return; + public List<IndexPartitionAndRecords> convertMetadata() { + return partitionsToUpdate.stream().flatMap(indexPartition -> { Review Comment: 🤖 This `partitionsToUpdate.stream().flatMap(...)` will invoke `buildUpdate` once per partition path, but `ExpressionIndexer.buildUpdate` / `SecondaryIndexer.buildUpdate` each iterate over ALL their respective metadata partitions and return records for every one. With N expression (or secondary) index partitions, you'll get N×N `IndexPartitionAndRecords` entries with the same `indexPartitionName`, all of which are then union'd in `tagRecordsWithLocation`. Could you dedupe by `MetadataPartitionType` first (e.g. `partitionsToUpdate.stream().map(MetadataPartitionType::fromPartitionPath).distinct()`) so each indexer's `buildUpdate` is called once? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * Data class representing a file and partition with a flag. + */ +@Getter +@AllArgsConstructor +@Accessors(fluent = true) +public class FileAndPartitionFlag implements Serializable { + private static final long serialVersionUID = 1L; Review Comment: 🤖 nit: the `Flag` suffix is pretty opaque — without looking at the fields you can't tell this is about deletion tracking. Something like `PartitionedFileWithDeleteStatus` or just `FileDeleteEntry` might make the intent obvious at call sites. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.secondary; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; + +/** + * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index + */ +@Slf4j +public class SecondaryIndexer extends BaseIndexer { + + public SecondaryIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (secondaryIndexPartitionsToInit.size() != 1) { + if (secondaryIndexPartitionsToInit.size() > 1) { Review Comment: 🤖 nit: the outer `size() != 1` check with a nested `size() > 1` warning-only block makes the two cases (empty vs. too many) harder to see at a glance. Could you separate them into two explicit guards — `if (size() > 1) { log.warn; return empty; }` followed by `if (isEmpty()) { return empty; }` — so each case is self-contained? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/partitionstats/TestPartitionStatsIndexer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. + */ + +package org.apache.hudi.metadata.index.partitionstats; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieIndexVersion; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +class TestPartitionStatsIndexer { + + @Test + void testSkipForNonPartitionedTable() throws IOException { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isTablePartitioned()).thenReturn(false); + + PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> result = indexer.buildInitialization("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertTrue(result.isEmpty()); + } + + @Test + void testSkipWhenColumnStatsDisabled() throws IOException { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isTablePartitioned()).thenReturn(true); + when(writeConfig.isMetadataColumnStatsIndexEnabled()).thenReturn(false); + + PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> result = indexer.buildInitialization("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertTrue(result.isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + void testInitializeWithRealEngineContextAndIndexDataContent() throws IOException { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isTablePartitioned()).thenReturn(true); + when(writeConfig.isMetadataColumnStatsIndexEnabled()).thenReturn(true); + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(writeConfig.getRecordMerger()).thenReturn(recordMerger); + when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO); + when(metadataConfig.getPartitionStatsIndexFileGroupCount()).thenReturn(4); + + HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_part", + Collections.singletonMap("f_part.parquet", 33L), Collections.emptyList())), + 1); + + try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = mockStatic(HoodieTableMetadataUtil.class)) { + mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(any(), any(), any(), any(), any(), any())) + .thenReturn(records); + + PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> initializationList = indexer.buildInitialization("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertEquals(1, initializationList.size()); + + assertEquals(4, initializationList.get(0).totalFileGroups()); + List<HoodieRecord> collected = initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList(); + assertEquals(1, collected.size()); + assertEquals("p_part", collected.get(0).getRecordKey()); + } + } + + @Test + void testBuildUpdateThrowsWhenColumnStatsPartitionNotAvailable() { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isMetadataPartitionAvailable(any(MetadataPartitionType.class))).thenReturn(false); + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(writeConfig.getRecordMerger()).thenReturn(recordMerger); + when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO); + + PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, writeConfig, metaClient); + assertThrows(IllegalStateException.class, () -> indexer.buildUpdate( + "010", + mock(HoodieBackedTableMetadata.class), + Lazy.lazily(() -> mock(HoodieTableFileSystemView.class)), + new HoodieCommitMetadata())); + } + + @Test + @SuppressWarnings("unchecked") + void testBuildUpdateWithNonEmptyCommitMetadataProducesPartitionEntry() { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieEngineContext realEngineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); Review Comment: 🤖 nit: both `engineContext` and `realEngineContext` are `HoodieLocalEngineContext` instances here, so the "real" prefix is misleading — it implies the first one is not real. In the similar test in `TestColumnStatsIndexer` the distinction is `mock(HoodieEngineContext.class)` vs `HoodieLocalEngineContext`, which makes the naming earn its keep. Could you rename this to something like `testDataEngineContext` or `parallelizeEngineContext` to clarify its role? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1211,35 +725,32 @@ private static List<HoodieRecord> convertMetadataToRollbackRecords(HoodieRollbac * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. */ private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map<String, Map<String, Long>> partitionToAppendedFiles) { + Map<String, List<FileInfoAndPartition>> partitionToAppendedFiles) { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { // Has this rollback produced new files? boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty(); final String partition = pm.getPartitionPath(); final String partitionId = getPartitionIdentifierForFilesPartition(partition); - BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> { - // if a file exists in both written log files and rollback log files, we want to pick the one that is higher - // as rollback file could have been updated after written log files are computed. - return oldSize > newSizeCopy ? oldSize : newSizeCopy; - }; - if (hasRollbackLogFiles) { Review Comment: 🤖 nit: this immediately converts the existing `List<FileInfoAndPartition>` to a `Map<String, Long>` for merging, then converts the result back to a `List` at line 749. The round-trip is surprising to a reader and only arises because the accumulation type was changed from `Map<String, Long>` to `List<FileInfoAndPartition>`. Could you keep a local `Map<String, Map<String, Long>>` as the accumulation structure inside `processRollbackMetadata` and do the conversion to `List<FileInfoAndPartition>` only once, at the point where the result is placed back into `partitionToAppendedFiles`? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer; +import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (filesToIndex.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + + int parallelism = Math.min(filesToIndex.size(), dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + HoodieSchema tableSchema = + HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient) + .orElseThrow(() -> new HoodieMetadataException("Table schema is not available for expression index initialization")); + HoodieSchema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, tableSchema); + + HoodieData<HoodieRecord> records = expressionIndexRecordGenerator.buildInitialization( + filesToIndex, indexDefinition, dataTableMetaClient, parallelism, + tableSchema, readerSchema, engineContext.getStorageConf(), dataTableInstantTime); + + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate(String instantTime, HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { + if (!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataTableMetaClient)) { + log.info("Don't need to update expression index, since no expression index is available"); + return Collections.emptyList(); + } + return dataTableMetaClient.getTableConfig().getMetadataPartitions() + .stream() + .filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) + .map(partition -> { + HoodieData<HoodieRecord> expressionIndexRecords; + try { + expressionIndexRecords = expressionIndexRecordGenerator.buildUpdate(dataTableMetaClient, tableMetadata, commitMetadata, partition, instantTime); + } catch (Exception e) { + throw new HoodieMetadataException(String.format("Failed to get expression index updates for partition %s", partition), e); + } + return IndexPartitionAndRecords.of(partition, expressionIndexRecords); + }).collect(Collectors.toList()); + } + + @Override + public List<IndexPartitionAndRecords> buildClean(String instantTime, HoodieCleanMetadata cleanMetadata) { + Option<HoodieIndexMetadata> indexMetadata = dataTableMetaClient.getIndexMetadata(); + if (indexMetadata.isEmpty()) { + throw new HoodieMetadataException("Expression index metadata not found"); + } + List<IndexPartitionAndRecords> indexRecordsList = new ArrayList<>(); + HoodieIndexMetadata metadata = indexMetadata.get(); + Map<String, HoodieIndexDefinition> indexDefinitions = metadata.getIndexDefinitions(); + if (indexDefinitions.isEmpty()) { + throw new HoodieMetadataException("Expression index metadata not found"); + } Review Comment: 🤖 nit: this message is identical to the one thrown at line 161, so there's no way to tell the two conditions apart in logs. Could you use a distinct message here, e.g. `"Expression index has no index definitions defined"`? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/DirectoryInfo.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import lombok.Getter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; + +/** + * A class which represents a directory and the files and directories inside it. + * <p> + * A {@code DirectoryInfo} object saves the name of the partition and various properties requires of each file + * required for initializing the metadata table. Saving limited properties reduces the total memory footprint when + * a very large number of files are present in the dataset being initialized. + */ +@Getter +public class DirectoryInfo implements Serializable { + private static final long serialVersionUID = 1L; + + // Relative path of the directory (relative to the base directory) + private final String relativePath; + // Map of filenames within this partition to their respective sizes + private final Map<String, Long> filenameToSizeMap; + // List of directories within this partition + private final List<StoragePath> subDirectories = new ArrayList<>(); + // Is this a hoodie partition + private boolean isHoodiePartition = false; + + public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, String maxInstantTime, Set<String> pendingDataInstants) { + this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true); + } + + /** + * When files are directly fetched from Metadata table we do not need to validate HoodiePartitions. + */ + public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, String maxInstantTime, Set<String> pendingDataInstants, + boolean validateHoodiePartitions) { + this.relativePath = relativePath; + + // Pre-allocate with the maximum length possible + filenameToSizeMap = new HashMap<>(pathInfos.size()); + + // Presence of partition meta file implies this is a HUDI partition + // if input files are directly fetched from MDT, it may not contain the HoodiePartitionMetadata file. So, we can ignore the validation for isHoodiePartition. + isHoodiePartition = !validateHoodiePartitions || pathInfos.stream().anyMatch(status -> status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)); + for (StoragePathInfo pathInfo : pathInfos) { + // Do not attempt to search for more subdirectories inside directories that are partitions + if (!isHoodiePartition && pathInfo.isDirectory()) { + // Ignore .hoodie directory as there cannot be any partitions inside it + if (!pathInfo.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + this.subDirectories.add(pathInfo.getPath()); + } + } else if (isHoodiePartition && FSUtils.isDataFile(pathInfo.getPath())) { + // Regular HUDI data file (base file or log file) + String dataFileCommitTime = FSUtils.getCommitTime(pathInfo.getPath().getName()); + // Limit the file listings to files which were created by successful commits before the maxInstant time. + if (!pendingDataInstants.contains(dataFileCommitTime) && compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) { + filenameToSizeMap.put(pathInfo.getPath().getName(), pathInfo.getLength()); + } + } + } + } + + public static Map<String, List<FileInfo>> getPartitionToFileInfo(List<DirectoryInfo> partitionInfoList) { + return partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); Review Comment: 🤖 nit: the lambda variable `p` is easy to mistake for a partition string given the surrounding context — could you rename it to `dirInfo` or `dir` to make it clear it's a `DirectoryInfo`? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieIndexVersion; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath; +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnsToIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private final Lazy<List<String>> columnsToIndex; + + public ColumnStatsIndexer(HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.columnsToIndex = Lazy.lazily(() -> + new ArrayList<>(getColumnsToIndex( + dataTableMetaClient.getTableConfig(), + dataTableWriteConfig.getMetadataConfig(), + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)), + true, + Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()), + existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)).keySet())); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + final int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + // columnsToIndex can be empty if meta fields are disabled and cols to index is not explicitly overridden. + if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } + + log.info("Indexing {} columns for column stats index", columnsToIndex.get().size()); + // during initialization, we need stats for base and log files. + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToAllFilesMap, + dataTableMetaClient, dataTableWriteConfig.getColumnStatsIndexParallelism(), + dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), + columnsToIndex.get()); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate( + String instantTime, + HoodieBackedTableMetadata tableMetadata, + Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { + final HoodieData<HoodieRecord> records = convertMetadataToColumnStatsRecords(commitMetadata, engineContext, + dataTableMetaClient, dataTableWriteConfig.getMetadataConfig(), Option.of(dataTableWriteConfig.getRecordMerger().getRecordType())); + return Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildClean(String instantTime, HoodieCleanMetadata cleanMetadata) { + final HoodieData<HoodieRecord> records = + convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, dataTableMetaClient, + dataTableWriteConfig.getMetadataConfig(), Option.of(dataTableWriteConfig.getRecordMerger().getRecordType())); + return Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildRestore(String instantTime, List<String> deletedPartitions, Map<String, List<FileInfo>> filesAdded, Map<String, List<String>> filesDeleted) { + if (filesDeleted.isEmpty() && filesAdded.isEmpty()) { + return Collections.emptyList(); + } + Lazy<Option<HoodieSchema>> tableSchema = + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)); + final List<String> columnsToIndex = new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex( Review Comment: 🤖 nit: this local `columnsToIndex` has the same name as the class field (line 74) but is computed with a different `populateMetaFields` argument (`false` here vs. `true` in the field). Could you rename it to something like `restoreColumnsToIndex` and add a short comment explaining why the argument differs, to avoid a future reader swapping in `this.columnsToIndex.get()` and silently changing behavior? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
