yihua commented on code in PR #18348: URL: https://github.com/apache/hudi/pull/18348#discussion_r3036289765
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/ExpressionIndexRecordGenerator.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.storage.StorageConfiguration; + +import java.util.List; + +/** + * Engine-specific generator for expression index records used during metadata index bootstrap. + */ +public interface ExpressionIndexRecordGenerator { + EngineType getEngineType(); + + /** + * Generates expression index records. Review Comment: π€ nit: the `@param filesToIndex` javadoc still says "triplet of partition, (file path, file size)" but the type is now `List<FileInfoAndPartition>` β worth updating the description to match the new model class. ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/record/TestRecordIndexer.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class TestRecordIndexer { + + @Test + void testGetDataCreatesDefinitionAndReturnsInitialization() throws IOException { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieData<HoodieRecord> records = mock(HoodieData.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(metadataConfig.getRecordIndexMaxParallelism()).thenReturn(4); + + DataPartitionAndRecords init = new DataPartitionAndRecords(2, Option.empty(), records); + ExposedRecordIndexer indexer = new ExposedRecordIndexer(engineContext, writeConfig, metaClient, init); + + try (MockedStatic<org.apache.hudi.metadata.HoodieTableMetadataUtil> mockedUtil = mockStatic(org.apache.hudi.metadata.HoodieTableMetadataUtil.class)) { Review Comment: π€ nit: same fully-qualified `org.apache.hudi.metadata.HoodieTableMetadataUtil` usage as in `TestPartitionedRecordIndexer` β an import would clean this up and it repeats on lines 63, 111, and 116 too. ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Accessors; + +/** + * Data class representing a file and partition with a flag. + */ +@Getter +@AllArgsConstructor +@Accessors(fluent = true) +public class FileAndPartitionFlag { + private final String partitionPath; + private final String fileName; + private final boolean flag; + Review Comment: π€ nit: `flag` is pretty opaque β it's unclear what this boolean represents at the call site. Something like `isDeleted` or `isAdded` (or whatever semantic it carries) would read more clearly. ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileSliceAndPartition.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata.model; + +import org.apache.hudi.common.model.FileSlice; + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +/** + * Holder for a {@link FileSlice} and its partition path. + */ +@AllArgsConstructor +@Getter +@Setter Review Comment: π€ nit: `@Setter` makes this mutable, which is inconsistent with `FileInfo` and `FileInfoAndPartition` in the same package β both use `final` fields and no setters. If mutation isn't needed here, have you considered dropping `@Setter` and making the fields `final`? ########## hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java: ########## @@ -964,7 +964,7 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws E assertEquals("0000003", completedReplaceInstant.get().requestedTime()); final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = new HashMap<>(); - metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e)); + metadataWriter.getEnabledIndexerMap().keySet().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e)); Review Comment: π€ nit: `e` isn't very expressive here β could you rename it to `partitionType` to match the domain concept? Same applies at the identical forEach on line 4105. ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileInfoAndPartition.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.Getter; +import lombok.experimental.Accessors; + +/** + * Immutable descriptor for a file that should be scanned during index bootstrap. + */ +@Getter +@Accessors(fluent = true) +public class FileInfoAndPartition { + private final String partitionPath; + private final String name; + private final long size; + + private FileInfoAndPartition(String partitionPath, String name, long size) { + this.partitionPath = partitionPath; + this.name = name; + this.size = size; + } + + public static FileInfoAndPartition of(String partition, String path, long size) { + return new FileInfoAndPartition(partition, path, size); Review Comment: π€ nit: the factory parameter is named `path` but the backing field is `name` β could you align them so readers don't have to mentally translate at the call site? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1308,20 +1270,20 @@ public static String getPartitionIdentifier(@Nonnull String relativePartitionPat */ public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, - Map<String, Map<String, Long>> partitionToAppendedFiles, + Map<String, List<FileInfo>> partitionToAppendedFiles, String instantTime, HoodieTableMetaClient dataMetaClient, int bloomIndexParallelism, String bloomFilterType) { // Create the tuple (partition, filename, isDeleted) to handle both deletes and appends - final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles); + final List<FileAndPartitionFlag> partitionFileFlagTupleList = fetchPartitionFileInfo(partitionToDeletedFiles, partitionToAppendedFiles); // Create records MDT int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), bloomIndexParallelism), 1); return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { - final String partitionName = partitionFileFlagTuple.f0; - final String filename = partitionFileFlagTuple.f1; - final boolean isDeleted = partitionFileFlagTuple.f2; + final String partitionName = partitionFileFlagTuple.partitionPath(); Review Comment: π€ nit: `partitionFileFlagTuple.flag()` doesn't read as a boolean predicate β could the accessor on `FileAndPartitionFlag` be named `isDeleted()` to make the intent obvious at the call site? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.record.HoodieRecordIndex; +import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; + +/** + * Implementation of the global {@link MetadataPartitionType#RECORD_INDEX} index + */ +@Slf4j +public class PartitionedRecordIndexer extends BaseRecordIndexer { + public PartitionedRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + createRecordIndexDefinition(dataTableMetaClient, Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true")); + Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = lazyLatestMergedPartitionFileSliceList.get().stream() + .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath)); + Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new HashMap<>(partitionFileSlicePairsMap.size()); + int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty() + ? 1 : Math.max(1, dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / partitionFileSlicePairsMap.size()); + int totalFileGroupCount = 0; Review Comment: π€ nit: iterating over `keySet()` and then calling `partitionFileSlicePairsMap.get(partition)` inside is a classic two-lookup pattern. The second loop in this same method already uses `entrySet()` β could you align the first loop to do the same? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/IndexerFactory.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer; +import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer; +import org.apache.hudi.metadata.index.expression.ExpressionIndexer; +import org.apache.hudi.metadata.index.files.FilesIndexer; +import org.apache.hudi.metadata.index.partitionstats.PartitionStatsIndexer; +import org.apache.hudi.metadata.index.record.PartitionedRecordIndexer; +import org.apache.hudi.metadata.index.record.RecordIndexer; +import org.apache.hudi.metadata.index.secondary.SecondaryIndexer; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for creating {@link Indexer} implementations and resolving enabled indexers + * based on table and metadata configuration. + */ Review Comment: π€ nit: `getIndexer` sounds like a retrieval from cache or registry; since it's always constructing a new instance, `createIndexer` would better signal its intent as a factory method. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.record.HoodieRecordIndex; +import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; + +/** Review Comment: π€ nit: the Javadoc says "Implementation of the **global** `RECORD_INDEX` index" β but this class is the *partitioned* variant. `RecordIndexer` is the global one. Might be worth updating to avoid confusion. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled + if (dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled()) { + validateRecordIndex(records, fileGroupCount, metadataMetaClient); + } + records.unpersist(); + } + + /** + * Validates the record index after bootstrap by comparing the expected record count with the actual + * record count stored in the metadata table. The validation is performed in a distributed manner + * using the engine context to count records from HFiles in parallel. + * + * @param recordIndexRecords the HoodieData containing the expected records + * @param fileGroupCount the expected number of file groups + * @param metadataMetaClient meta client for the metadata table + */ + protected void validateRecordIndex(HoodieData<HoodieRecord> recordIndexRecords, int fileGroupCount, HoodieTableMetaClient metadataMetaClient) { + String partitionName = MetadataPartitionType.RECORD_INDEX.getPartitionPath(); + HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient); + try { + // Use merged file slices to handle cases with pending compactions + List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, fsView, partitionName); + + // Filter to only file slices with base files and extract their storage paths + List<StoragePath> baseFilePaths = fileSlices.stream() + .filter(fs -> fs.getBaseFile().isPresent()) + .map(fs -> fs.getBaseFile().get().getStoragePath()) + .collect(Collectors.toList()); + + // Count records in a distributed manner using the engine context + long totalRecords = countRecordsInHFiles(baseFilePaths, metadataMetaClient); + long expectedRecordCount = recordIndexRecords.count(); + + ValidationUtils.checkArgument(totalRecords == expectedRecordCount, "Record Count Validation failed with " + + totalRecords + " present in record index vs the expected " + expectedRecordCount); + log.info(String.format("Record index initialized on %d shards (expected = %d) with %d records (expected = %d)", + fileSlices.size(), fileGroupCount, totalRecords, expectedRecordCount)); + } finally { + fsView.close(); + } + } + + /** + * Counts the total number of records in HFiles in a distributed manner. + * + * @param baseFilePaths list of storage paths to HFiles + * @param metadataMetaClient meta client for the metadata table + * @return total number of records across all HFiles + */ + private long countRecordsInHFiles(List<StoragePath> baseFilePaths, HoodieTableMetaClient metadataMetaClient) { + if (baseFilePaths.isEmpty()) { + return 0L; + } + + int parallelism = Math.min(baseFilePaths.size(), dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + StorageConfiguration<?> storageConfBroadcast = metadataMetaClient.getStorageConf(); + HoodieFileFormat baseFileFormat = metadataMetaClient.getTableConfig().getBaseFileFormat(); + + return engineContext.parallelize(baseFilePaths, parallelism) + .mapPartitions(pathIterator -> { + long count = 0L; + while (pathIterator.hasNext()) { + StoragePath path = pathIterator.next(); + try { + HoodieStorage storage = HoodieStorageUtils.getStorage(path, storageConfBroadcast); + HoodieConfig readerConfig = new HoodieConfig(); + HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage) + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(readerConfig, path, baseFileFormat, Option.empty()); + try { + count += reader.getTotalRecords(); + } finally { + reader.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Error reading total records from file " + path, e); Review Comment: π€ nit: small typo in the Javadoc β "context ot use" should be "context to use". ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/bloomfilters/TestBloomFiltersIndexer.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package org.apache.hudi.metadata.index.bloomfilters; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +class TestBloomFiltersIndexer { + + @SuppressWarnings("unchecked") + @Test + void testInitializeDataWithRealEngineContextAndIndexDataContent() throws IOException { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(writeConfig.getBloomIndexParallelism()).thenReturn(8); + when(writeConfig.getBloomFilterType()).thenReturn("DYNAMIC_V0"); + when(metadataConfig.getBloomFilterIndexFileGroupCount()).thenReturn(3); + + HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_bloom", + Collections.singletonMap("f1.parquet", 11L), Collections.emptyList())), + 1); + + try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = mockStatic(HoodieTableMetadataUtil.class)) { + mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(any(), any(), any(), any(), any(), anyInt(), any())) + .thenReturn(records); + + ExposedBloomFiltersIndexer indexer = new ExposedBloomFiltersIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> initializationList = indexer.callGetData("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertEquals(1, initializationList.size()); + + assertEquals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), initializationList.get(0).indexPartitionName()); + assertEquals(3, initializationList.get(0).totalFileGroups()); + List<HoodieRecord> collected = initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList(); + assertEquals(1, collected.size()); + assertEquals("p_bloom", collected.get(0).getRecordKey()); + } Review Comment: π€ nit: `callGetData` is a bit opaque β have you considered naming this `callBuildInitialization` or simply `buildInitialization` (overriding visibility) to make it obvious what method is being exercised? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.secondary; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; + +/** + * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index + */ +@Slf4j +public class SecondaryIndexer extends BaseIndexer { + Review Comment: π€ nit: `RECORD_INDEX_AVERAGE_RECORD_SIZE = 48` is also defined in `BaseRecordIndexer`. Since `SecondaryIndexer` can't inherit it, could this constant live in a shared place (e.g. `HoodieTableMetadataUtil` or a `RecordIndexConstants` class) to avoid the duplication? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled + if (dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled()) { + validateRecordIndex(records, fileGroupCount, metadataMetaClient); + } + records.unpersist(); + } + + /** + * Validates the record index after bootstrap by comparing the expected record count with the actual + * record count stored in the metadata table. The validation is performed in a distributed manner + * using the engine context to count records from HFiles in parallel. + * + * @param recordIndexRecords the HoodieData containing the expected records + * @param fileGroupCount the expected number of file groups + * @param metadataMetaClient meta client for the metadata table + */ + protected void validateRecordIndex(HoodieData<HoodieRecord> recordIndexRecords, int fileGroupCount, HoodieTableMetaClient metadataMetaClient) { + String partitionName = MetadataPartitionType.RECORD_INDEX.getPartitionPath(); + HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient); + try { + // Use merged file slices to handle cases with pending compactions + List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, fsView, partitionName); + + // Filter to only file slices with base files and extract their storage paths + List<StoragePath> baseFilePaths = fileSlices.stream() + .filter(fs -> fs.getBaseFile().isPresent()) + .map(fs -> fs.getBaseFile().get().getStoragePath()) + .collect(Collectors.toList()); + + // Count records in a distributed manner using the engine context + long totalRecords = countRecordsInHFiles(baseFilePaths, metadataMetaClient); + long expectedRecordCount = recordIndexRecords.count(); + + ValidationUtils.checkArgument(totalRecords == expectedRecordCount, "Record Count Validation failed with " + + totalRecords + " present in record index vs the expected " + expectedRecordCount); + log.info(String.format("Record index initialized on %d shards (expected = %d) with %d records (expected = %d)", + fileSlices.size(), fileGroupCount, totalRecords, expectedRecordCount)); + } finally { + fsView.close(); + } + } + + /** + * Counts the total number of records in HFiles in a distributed manner. + * + * @param baseFilePaths list of storage paths to HFiles Review Comment: π€ nit: `storageConfBroadcast` implies a Spark broadcast variable, but this is just a plain local captured by the lambda. Something like `storageConf` would avoid the misleading connotation. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private Lazy<List<String>> columnsToIndex; + Review Comment: π€ nit: could `columnsToIndex` be `final`? It's assigned in the constructor but then silently reassigned to `emptyList` inside `buildInitialization` when the partition map is empty β that mutation on `this` in a method is a bit surprising. Would it be cleaner to make the field final and handle the empty-partition short-circuit without touching it? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyLatestMergedPartitionFileSliceList.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (partitionFileSlicePairs.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + + int parallelism = Math.min(filesToIndex.size(), dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + Lazy<HoodieSchema> tableSchemaOpt = Lazy.lazily(() -> + HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient) Review Comment: π€ nit: `tableSchemaOpt` reads like it holds an `Option`, but it's a `Lazy<HoodieSchema>` β and since `.get()` is called on the very next line, the `Lazy` wrapper adds no deferred-evaluation benefit here. Could you just inline it as `HoodieSchema tableSchema = HoodieTableMetadataUtil.tryResolveSchemaForTable(...).orElseThrow(...)`? ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/columnstats/TestColumnStatsIndexer.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +class TestColumnStatsIndexer { + + @Test + void testInitializeDataWithEmptyInputUsesEmptyHoodieData() throws IOException { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieData<HoodieRecord> emptyData = mock(HoodieData.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(metadataConfig.getColumnStatsIndexFileGroupCount()).thenReturn(2); + when(engineContext.emptyHoodieData()).thenReturn((HoodieData) emptyData); + + ExposedColumnStatsIndexer indexer = new ExposedColumnStatsIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> initializationList = indexer.callGetData("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertEquals(1, initializationList.size()); + + assertEquals(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), initializationList.get(0).indexPartitionName()); + assertSame(emptyData, initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords()); + } + + @SuppressWarnings("unchecked") + @Test + void testInitializeDataWithRealEngineContextAndIndexDataContent() throws IOException { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(writeConfig.getColumnStatsIndexParallelism()).thenReturn(4); + when(writeConfig.getRecordMerger()).thenReturn(recordMerger); + when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO); + when(metadataConfig.getColumnStatsIndexFileGroupCount()).thenReturn(5); + when(metadataConfig.getMaxReaderBufferSize()).thenReturn(4096); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + + Map<String, List<FileInfo>> files = new HashMap<>(); + files.put("p1", Collections.singletonList(FileInfo.of("f1.parquet", 1L))); + + HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_col", + Collections.singletonMap("f_col.parquet", 22L), Collections.emptyList())), + 1); + + try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = mockStatic(HoodieTableMetadataUtil.class)) { + Map<String, Object> columns = new HashMap<>(); + columns.put("c1", new Object()); + mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(any(), any(), any(), eq(true), eq(Option.of(HoodieRecord.HoodieRecordType.AVRO)), any())) + .thenReturn(columns); + mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(any(), any(), any(), any(), anyInt(), anyInt(), any())) + .thenReturn(records); + + ExposedColumnStatsIndexer indexer = new ExposedColumnStatsIndexer(engineContext, writeConfig, metaClient); + List<IndexPartitionInitialization> initializationList = indexer.callGetData("001", "002", files, Lazy.lazily(Collections::emptyList)); + assertEquals(1, initializationList.size()); + + assertEquals(5, initializationList.get(0).totalFileGroups()); + List<HoodieRecord> collected = initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList(); + assertEquals(1, collected.size()); + assertEquals("p_col", collected.get(0).getRecordKey()); + } + } + + private static class ExposedColumnStatsIndexer extends ColumnStatsIndexer { + ExposedColumnStatsIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + List<IndexPartitionInitialization> callGetData(String dataTableInstantTime, String instantTimeForPartition, Review Comment: π€ nit: same as `TestBloomFiltersIndexer` β `callGetData` could be renamed to `callBuildInitialization` to make it clear what's being tested. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private Lazy<List<String>> columnsToIndex; + + public ColumnStatsIndexer(HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.columnsToIndex = Lazy.lazily(() -> + new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex( + dataTableMetaClient.getTableConfig(), + dataTableWriteConfig.getMetadataConfig(), + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)), + true, + Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()), + existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)).keySet())); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + final int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + if (partitionToAllFilesMap.isEmpty()) { + this.columnsToIndex = Lazy.lazily(Collections::emptyList); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } + + if (columnsToIndex.get().isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } + + log.info("Indexing {} columns for column stats index", columnsToIndex.get().size()); + // during initialization, we need stats for base and log files. + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToAllFilesMap, + dataTableMetaClient, dataTableWriteConfig.getColumnStatsIndexParallelism(), + dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), + columnsToIndex.get()); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder() + .withIndexName(PARTITION_NAME_COLUMN_STATS) + .withIndexType(PARTITION_NAME_COLUMN_STATS) + .withIndexFunction(PARTITION_NAME_COLUMN_STATS) + .withSourceFields(columnsToIndex.get()) + // Use the existing version if exists, otherwise fall back to the default version. + .withVersion(existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)) + .withIndexOptions(Collections.EMPTY_MAP) + .build(); Review Comment: π€ nit: `Collections.EMPTY_MAP` is the raw-type constant β the rest of the metadata module consistently uses `Collections.emptyMap()` for type safety. Could you swap it here? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java: ########## @@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig( return metadataWriteConfig; } + /** + * Convert the clean action to metadata records. + */ + public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, + HoodieCleanMetadata cleanMetadata, + String instantTime, + HoodieTableMetaClient dataMetaClient, Review Comment: π€ nit: the parameter is called `enabledIndexBuilderMap` here but the same concept is `enabledIndexerMap` everywhere else in the codebase β could you align these to reduce the mental mapping? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); + + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + hasPartitionsStateChanged = true; } - // For a fresh table, defer RLI initialization - if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() && this.enabledPartitionTypes.contains(RECORD_INDEX) - && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { - this.enabledPartitionTypes.remove(RECORD_INDEX); - partitionsToInit.remove(RECORD_INDEX); - } - - Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - for (MetadataPartitionType partitionType : partitionsToInit) { - // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. - String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); - String partitionTypeName = partitionType.name(); - LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition); - String relativePartitionPath; - Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; - Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)); - try { - switch (partitionType) { - case FILES: - fileGroupCountAndRecordsPair = initializeFilesPartition(partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case BLOOM_FILTERS: - fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case COLUMN_STATS: - Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema); - fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue(); - initializeFilegroupsAndCommit(partitionType, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, colStatsColumnsAndRecord.getKey()); - break; - case RECORD_INDEX: - boolean isPartitionedRLI = dataWriteConfig.isRecordLevelIndexEnabled(); - initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI); - break; - case EXPRESSION_INDEX: - Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (expressionIndexPartitionsToInit.size() != 1) { - if (expressionIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = expressionIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case PARTITION_STATS: - // For PARTITION_STATS, COLUMN_STATS should also be enabled - if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) { - LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); - continue; - } - fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case SECONDARY_INDEX: - Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (secondaryIndexPartitionsToInit.size() != 1) { - if (secondaryIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = secondaryIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - default: - throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); - } - } catch (Exception e) { - String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; - metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - String errMsg = String.format("Bootstrap on %s partition failed for %s", - partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); - LOG.error(errMsg, e); - throw new HoodieMetadataException(errMsg, e); - } + for (Map.Entry<MetadataPartitionType, Indexer> entry : + indexerMapForPartitionsToInit.entrySet().stream() + .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) { + initializeMetadataPartition(entry.getKey(), entry.getValue(), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); hasPartitionsStateChanged = true; } return true; } + @SneakyThrows + private void initializeMetadataPartition( + MetadataPartitionType partitionType, + Indexer indexer, + String dataTableInstantTime, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) { + String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); + // initialize metadata partitions + List<IndexPartitionInitialization> initializationList; + try { + initializationList = indexer.buildInitialization( + dataTableInstantTime, instantTimeForPartition, partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + } catch (Exception e) { + String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; + metrics.ifPresent(m -> m.setMetric(metricKey, 1)); + String errMsg = String.format("Bootstrap on %s partition failed for %s", + partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); + LOG.error(errMsg, e); + throw new HoodieMetadataException(errMsg, e); + } + + if (initializationList.isEmpty()) { + LOG.info("Skip building {} index in metadata table", partitionType.getPartitionPath()); + return; + } + + ValidationUtils.checkArgument(initializationList.size() == 1, + "Only support the initialization of one partition per index type " + + "(HUDI-9358 for the feature support)"); + + IndexPartitionInitialization initialIndexPartitionData = initializationList.get(0); + final int numFileGroup = initialIndexPartitionData.totalFileGroups(); + String relativePartitionPath = initialIndexPartitionData.indexPartitionName(); + LOG.info("Initializing {} index with {} file groups", relativePartitionPath, numFileGroup); + + HoodieTimer partitionInitTimer = HoodieTimer.start(); + clearExistingMetadataPartition(relativePartitionPath); + HoodieData<HoodieRecord> records = engineContext.emptyHoodieData(); + for (DataPartitionAndRecords dataPartitionAndRecords: initialIndexPartitionData.dataPartitionAndRecords()) { + initializeFileGroups(dataMetaClient, partitionType, instantTimeForPartition, + dataPartitionAndRecords.numFileGroups(), relativePartitionPath, dataPartitionAndRecords.dataPartition()); + records = records.union(dataPartitionAndRecords.indexRecords()); + } + + bulkCommit(instantTimeForPartition, relativePartitionPath, records, initialIndexPartitionData.indexParser()); + + indexer.postInitialization(metadataMetaClient, records, numFileGroup, relativePartitionPath); + // initialize metadata reader + initMetadataReader(); + long totalInitTime = partitionInitTimer.endTimer(); + LOG.info("Initializing {} index in metadata table took {} in ms", partitionType, totalInitTime); Review Comment: π€ nit: `@SneakyThrows` silently swallows the checked `IOException` contract from `clearExistingMetadataPartition`, making it invisible to callers. Could you declare `throws IOException` on this method instead and let the exception propagate explicitly? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); + + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + hasPartitionsStateChanged = true; } - // For a fresh table, defer RLI initialization - if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() && this.enabledPartitionTypes.contains(RECORD_INDEX) - && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { - this.enabledPartitionTypes.remove(RECORD_INDEX); - partitionsToInit.remove(RECORD_INDEX); - } - - Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - for (MetadataPartitionType partitionType : partitionsToInit) { - // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. - String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); - String partitionTypeName = partitionType.name(); - LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition); - String relativePartitionPath; - Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; - Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)); - try { - switch (partitionType) { - case FILES: - fileGroupCountAndRecordsPair = initializeFilesPartition(partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case BLOOM_FILTERS: - fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case COLUMN_STATS: - Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema); - fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue(); - initializeFilegroupsAndCommit(partitionType, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, colStatsColumnsAndRecord.getKey()); - break; - case RECORD_INDEX: - boolean isPartitionedRLI = dataWriteConfig.isRecordLevelIndexEnabled(); - initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI); - break; - case EXPRESSION_INDEX: - Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (expressionIndexPartitionsToInit.size() != 1) { - if (expressionIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = expressionIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case PARTITION_STATS: - // For PARTITION_STATS, COLUMN_STATS should also be enabled - if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) { - LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); - continue; - } - fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case SECONDARY_INDEX: - Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (secondaryIndexPartitionsToInit.size() != 1) { - if (secondaryIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = secondaryIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - default: - throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); - } - } catch (Exception e) { - String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; - metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - String errMsg = String.format("Bootstrap on %s partition failed for %s", - partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); - LOG.error(errMsg, e); - throw new HoodieMetadataException(errMsg, e); - } + for (Map.Entry<MetadataPartitionType, Indexer> entry : + indexerMapForPartitionsToInit.entrySet().stream() + .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) { + initializeMetadataPartition(entry.getKey(), entry.getValue(), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); hasPartitionsStateChanged = true; } return true; } + @SneakyThrows + private void initializeMetadataPartition( + MetadataPartitionType partitionType, + Indexer indexer, + String dataTableInstantTime, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) { + String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); + // initialize metadata partitions + List<IndexPartitionInitialization> initializationList; + try { + initializationList = indexer.buildInitialization( + dataTableInstantTime, instantTimeForPartition, partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + } catch (Exception e) { + String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; + metrics.ifPresent(m -> m.setMetric(metricKey, 1)); + String errMsg = String.format("Bootstrap on %s partition failed for %s", + partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); + LOG.error(errMsg, e); + throw new HoodieMetadataException(errMsg, e); + } + + if (initializationList.isEmpty()) { + LOG.info("Skip building {} index in metadata table", partitionType.getPartitionPath()); + return; + } + + ValidationUtils.checkArgument(initializationList.size() == 1, + "Only support the initialization of one partition per index type " + + "(HUDI-9358 for the feature support)"); + + IndexPartitionInitialization initialIndexPartitionData = initializationList.get(0); + final int numFileGroup = initialIndexPartitionData.totalFileGroups(); + String relativePartitionPath = initialIndexPartitionData.indexPartitionName(); + LOG.info("Initializing {} index with {} file groups", relativePartitionPath, numFileGroup); + + HoodieTimer partitionInitTimer = HoodieTimer.start(); + clearExistingMetadataPartition(relativePartitionPath); + HoodieData<HoodieRecord> records = engineContext.emptyHoodieData(); + for (DataPartitionAndRecords dataPartitionAndRecords: initialIndexPartitionData.dataPartitionAndRecords()) { Review Comment: π€ nit: collecting the entry set to a list just to iterate it feels a bit roundabout. Have you considered using `.entrySet().stream().filter(e -> e.getKey() != FILES).forEach(e -> initializeMetadataPartition(...))` to avoid the intermediate allocation? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); + + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + hasPartitionsStateChanged = true; } - // For a fresh table, defer RLI initialization - if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() && this.enabledPartitionTypes.contains(RECORD_INDEX) - && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { - this.enabledPartitionTypes.remove(RECORD_INDEX); - partitionsToInit.remove(RECORD_INDEX); - } - - Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); - for (MetadataPartitionType partitionType : partitionsToInit) { - // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. - String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); - String partitionTypeName = partitionType.name(); - LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition); - String relativePartitionPath; - Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; - Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)); - try { - switch (partitionType) { - case FILES: - fileGroupCountAndRecordsPair = initializeFilesPartition(partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case BLOOM_FILTERS: - fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap); - initializeFilegroupsAndCommit(partitionType, BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case COLUMN_STATS: - Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema); - fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue(); - initializeFilegroupsAndCommit(partitionType, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, colStatsColumnsAndRecord.getKey()); - break; - case RECORD_INDEX: - boolean isPartitionedRLI = dataWriteConfig.isRecordLevelIndexEnabled(); - initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI); - break; - case EXPRESSION_INDEX: - Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (expressionIndexPartitionsToInit.size() != 1) { - if (expressionIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = expressionIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case PARTITION_STATS: - // For PARTITION_STATS, COLUMN_STATS should also be enabled - if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) { - LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); - continue; - } - fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema); - initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - case SECONDARY_INDEX: - Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient); - if (secondaryIndexPartitionsToInit.size() != 1) { - if (secondaryIndexPartitionsToInit.size() > 1) { - LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); - } - continue; - } - relativePartitionPath = secondaryIndexPartitionsToInit.iterator().next(); - fileGroupCountAndRecordsPair = initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList); - initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); - break; - default: - throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); - } - } catch (Exception e) { - String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; - metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - String errMsg = String.format("Bootstrap on %s partition failed for %s", - partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); - LOG.error(errMsg, e); - throw new HoodieMetadataException(errMsg, e); - } + for (Map.Entry<MetadataPartitionType, Indexer> entry : + indexerMapForPartitionsToInit.entrySet().stream() + .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) { + initializeMetadataPartition(entry.getKey(), entry.getValue(), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); hasPartitionsStateChanged = true; } return true; } + @SneakyThrows + private void initializeMetadataPartition( + MetadataPartitionType partitionType, + Indexer indexer, + String dataTableInstantTime, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) { + String instantTimeForPartition = generateUniqueInstantTime(dataTableInstantTime); + // initialize metadata partitions + List<IndexPartitionInitialization> initializationList; + try { + initializationList = indexer.buildInitialization( + dataTableInstantTime, instantTimeForPartition, partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList); + } catch (Exception e) { + String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; + metrics.ifPresent(m -> m.setMetric(metricKey, 1)); + String errMsg = String.format("Bootstrap on %s partition failed for %s", + partitionType.getPartitionPath(), metadataMetaClient.getBasePath()); + LOG.error(errMsg, e); + throw new HoodieMetadataException(errMsg, e); + } + + if (initializationList.isEmpty()) { + LOG.info("Skip building {} index in metadata table", partitionType.getPartitionPath()); + return; Review Comment: π€ nit: `initialIndexPartitionData` reads as "the first index partition data" rather than "the initialization data for an index partition". Something like `partitionInit` or `indexPartitionInit` might be less ambiguous. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java: ########## @@ -319,12 +321,11 @@ public static ExpressionIndexComputationMetadata getExprIndexRecords( } private static Iterator<Row> getExpressionIndexRecordsIterator(HoodieReaderContext<InternalRow> readerContext, HoodieTableMetaClient metaClient, - HoodieSchema tableSchema, HoodieSchema readerSchema, HoodieWriteConfig dataWriteConfig, Pair<String, Pair<String, Long>> entry) { - String partition = entry.getKey(); - Pair<String, Long> filePathSizePair = entry.getValue(); - String filePath = filePathSizePair.getKey(); + HoodieSchema tableSchema, HoodieSchema readerSchema, HoodieWriteConfig dataWriteConfig, FileInfoAndPartition entry) { + String partition = entry.partitionPath(); Review Comment: π€ nit: `entry.name()` is assigned to a variable called `filePath`, suggesting it returns a full path rather than just a file name β could you rename the accessor to `filePath()` or `path()` on `FileInfoAndPartition` to match what it actually holds? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/Indexer.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.util.Lazy; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Interface to building records for initializing and updating a type of metadata or index + * in the metadata table. + * <p> + * When a new type of index is added to MetadataPartitionType, an + * implementation of the {@link Indexer} interface is required, and it + * must be added to {@link IndexerFactory}. + */ +public interface Indexer { + /** + * Generates records for initializing the index. + * + * @param dataTableInstantTime instant time of the data table that the metadata table is initialized on + * @param instantTimeForPartition instant time used for initializing a specific metadata partition + * @param partitionToAllFilesMap map of partition to files + * @param lazyLatestMergedPartitionFileSliceList lazily-evaluated list of file slices for the indexer that needs it + * @return zero or more {@link IndexPartitionInitialization} entries to be initialized. + * Returning an empty list means no metadata partition needs initialization in this invocation. + * @throws IOException upon IO error + */ + List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException; Review Comment: π€ nit: `lazyLatestMergedPartitionFileSliceList` is quite a mouthful β something like `lazyPartitionFileSlices` carries the same meaning with less visual noise. ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/expression/TestExpressionIndexer.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +class TestExpressionIndexer { + + @Test + void testSkipWhenMultipleExpressionPartitions() throws IOException { + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + + try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = mockStatic(HoodieTableMetadataUtil.class)) { + mockedUtil.when(() -> HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit(any(), any(), any())) + .thenReturn(Set.of("expr1", "expr2")); + + ExposedExpressionIndexer indexer = new ExposedExpressionIndexer(engineContext, writeConfig, metaClient, mock(ExpressionIndexRecordGenerator.class)); + List<IndexPartitionInitialization> result = indexer.callGetData("001", "002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList)); + assertTrue(result.isEmpty()); + } + } + + @SuppressWarnings("unchecked") + @Test + void testInitializeWithRealEngineContextAndIndexDataContent() throws IOException { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + ExpressionIndexRecordGenerator generator = mock(ExpressionIndexRecordGenerator.class); + HoodieIndexDefinition definition = mock(HoodieIndexDefinition.class); + HoodieSchema tableSchema = mock(HoodieSchema.class); + HoodieSchema projectedSchema = mock(HoodieSchema.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(metadataConfig.getExpressionIndexFileGroupCount()).thenReturn(6); + when(metadataConfig.getExpressionIndexParallelism()).thenReturn(10); + + HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_expr", + Collections.singletonMap("f_expr.parquet", 55L), Collections.emptyList())), + 1); + when(generator.generate(any(), any(), any(), anyInt(), any(), any(), any(), any())).thenReturn(records); + + FileSlice fileSlice = new FileSlice("p1", "001", "f1"); + fileSlice.setBaseFile(new HoodieBaseFile("file:///tmp/p1/f1.parquet")); + List<FileSliceAndPartition> fileSlices = Collections.singletonList(FileSliceAndPartition.of("p1", fileSlice)); + + try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = mockStatic(HoodieTableMetadataUtil.class)) { + mockedUtil.when(() -> HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit(any(), any(), any())) + .thenReturn(Collections.singleton("expr_idx")); + mockedUtil.when(() -> HoodieTableMetadataUtil.getHoodieIndexDefinition("expr_idx", metaClient)).thenReturn(definition); + mockedUtil.when(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(metaClient)).thenReturn(Option.of(tableSchema)); + mockedUtil.when(() -> HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex(definition, metaClient, tableSchema)).thenReturn(projectedSchema); + + ExposedExpressionIndexer indexer = new ExposedExpressionIndexer(engineContext, writeConfig, metaClient, generator); + List<IndexPartitionInitialization> initializationList = indexer.callGetData("001", "002", new HashMap<>(), Lazy.lazily(() -> fileSlices)); + assertEquals(1, initializationList.size()); + + assertEquals("expr_idx", initializationList.get(0).indexPartitionName()); + List<HoodieRecord> collected = initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList(); + assertEquals(1, collected.size()); + assertEquals("p_expr", collected.get(0).getRecordKey()); + } + } + + private static class ExposedExpressionIndexer extends ExpressionIndexer { + ExposedExpressionIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient, expressionIndexRecordGenerator); + } + + List<IndexPartitionInitialization> callGetData(String dataTableInstantTime, String instantTimeForPartition, Review Comment: π€ nit: `callGetData` doesn't quite describe what's happening β it actually delegates to `buildInitialization`. Something like `callBuildInitialization` (or just calling `buildInitialization` directly, since it's protected) would make the intent clearer. Same pattern appears in `TestFilesIndexer`, `TestPartitionStatsIndexer`, `TestRecordIndexer`, and `TestSecondaryIndexer`. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexTableVersionSix.scala: ########## @@ -18,15 +18,49 @@ package org.apache.hudi.functional -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.MetadataPartitionType +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Tag +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +import java.util.Collections @Tag("functional-b") class TestGlobalRecordLevelIndexTableVersionSix extends TestGlobalRecordLevelIndex { override def commonOpts: Map[String, String] = super.commonOpts ++ Map( HoodieTableConfig.VERSION.key() -> "6", HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6" ) + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + override def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true") + doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + val writeConfig = getWriteConfig(hudiOpts) + writeConfig.setSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + getHoodieWriteClient(writeConfig).dropIndex(Collections.singletonList(MetadataPartitionType.RECORD_INDEX.getPartitionPath)) + assertEquals(0, getFileGroupCountForRecordIndex(writeConfig)) + metaClient = HoodieTableMetaClient.reload(metaClient) + assertEquals(0, metaClient.getTableConfig.getMetadataPartitionsInflight.size()) Review Comment: π€ nit: "partition stats partition" reads oddly β looks like a duplicate word. And since the assertion below checks for size 2, the comment might mean "only files and col stats partitions should be present"? ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -453,8 +454,25 @@ public static Set<String> getAllPartitionPaths() { */ public static MetadataPartitionType[] getValidValues() { Review Comment: π€ nit: the `// ALL_PARTITIONS is just another record type in FILES partition` comment now lives in the no-arg stub rather than in the overload that actually contains the filtering logic β might be worth moving it down to `getValidValues(HoodieTableVersion)` where it still describes something meaningful. ########## hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java: ########## @@ -4089,20 +4089,20 @@ private void validateMetadata(SparkRDDWriteClient testClient, Option<String> ign List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient, false); // Secondary index is enabled by default but no MDT partition corresponding to it is available final boolean isPartitionStatsEnabled; - if (!metadataWriter.getEnabledPartitionTypes().contains(COLUMN_STATS)) { + if (!metadataWriter.getEnabledIndexerMap().containsKey(COLUMN_STATS)) { Review Comment: π€ nit: the if/else block could collapse into a single line β `final boolean isPartitionStatsEnabled = metadataWriter.getEnabledIndexerMap().containsKey(COLUMN_STATS);` ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/record/TestPartitionedRecordIndexer.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.util.Lazy; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +class TestPartitionedRecordIndexer { + + @SuppressWarnings("unchecked") + @Test + void testInitializeWithRealEngineContextAndIndexDataContent() throws IOException { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(getDefaultStorageConf()); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class); + HoodieTableMetaClient dataMetaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + + when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); + when(metadataConfig.getRecordIndexMaxParallelism()).thenReturn(8); + when(dataMetaClient.getTableConfig()).thenReturn(tableConfig); + + HoodieData<HoodieRecord> p1Data = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p1_data", + Collections.singletonMap("f1.parquet", 1L), Collections.emptyList())), + 1); + HoodieData<HoodieRecord> p2Data = (HoodieData<HoodieRecord>) (HoodieData<?>) engineContext.parallelize( + Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p2_data", + Collections.singletonMap("f2.parquet", 2L), Collections.emptyList())), + 1); + + DataPartitionAndRecords p1Init = new DataPartitionAndRecords(1, Option.of("p1"), p1Data); + DataPartitionAndRecords p2Init = new DataPartitionAndRecords(2, Option.of("p2"), p2Data); + + FileSliceAndPartition fs1 = FileSliceAndPartition.of("p1", new FileSlice("p1", "001", "f1")); + FileSliceAndPartition fs2 = FileSliceAndPartition.of("p2", new FileSlice("p2", "001", "f2")); + List<FileSliceAndPartition> input = Arrays.asList(fs1, fs2); + + ExposedPartitionedRecordIndexer indexer = new ExposedPartitionedRecordIndexer(engineContext, writeConfig, dataMetaClient, p1Init, p2Init); + + try (MockedStatic<org.apache.hudi.metadata.HoodieTableMetadataUtil> mockedUtil = mockStatic(org.apache.hudi.metadata.HoodieTableMetadataUtil.class)) { Review Comment: π€ nit: `org.apache.hudi.metadata.HoodieTableMetadataUtil` is repeated as a fully-qualified name in both the `try` header and the `verify` call (line 76) β could you add an import and use the simple name instead? The other test files already import it this way. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/index/SparkExpressionIndexRecordGenerator.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.client.utils.SparkMetadataWriterUtils; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.storage.StorageConfiguration; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; + +/** + * Spark implementation of {@link ExpressionIndexRecordGenerator}. Review Comment: π€ nit: `@Slf4j` is declared but `log` doesn't appear to be used anywhere in this class β worth removing it to keep the class tidy, or adding a log statement if one was intended. ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/DirectoryInfo.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import lombok.Getter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; + +/** + * A class which represents a directory and the files and directories inside it. + * <p> + * A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file + * required for initializing the metadata table. Saving limited properties reduces the total memory footprint when Review Comment: π€ nit: the Javadoc references `PartitionFileInfo` but the class is `DirectoryInfo` β looks like a stale copy-paste. Could you update it to match? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
