hudi-agent commented on code in PR #18372: URL: https://github.com/apache/hudi/pull/18372#discussion_r3308175358
########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileInfoAndPartition.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.Getter; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * Immutable descriptor for a file that should be scanned during index bootstrap. + */ +@Getter +@Accessors(fluent = true) +public class FileInfoAndPartition implements Serializable { + private static final long serialVersionUID = 1L; + + private final String partitionPath; + private final String path; + private final long size; Review Comment: 🤖 nit: could you rename `path` to `fileName` here? The sibling class `FileInfo` in the same package already uses `fileName` for the same concept (a file name within a partition, not a full path), and in practice this field is populated from `new StoragePath(path).getName()`. Calling it `path` while it holds only a name could mislead a future reader. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/index/SparkExpressionIndexRecordGenerator.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.client.utils.SparkMetadataWriterUtils; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.exception.HoodieSchemaNotFoundException; +import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; + +/** + * Spark implementation of {@link ExpressionIndexRecordGenerator}. + */ +public class SparkExpressionIndexRecordGenerator implements ExpressionIndexRecordGenerator { + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig dataWriteConfig; + + public SparkExpressionIndexRecordGenerator(HoodieEngineContext engineContext, + HoodieWriteConfig dataWriteConfig) { + this.engineContext = engineContext; + this.dataWriteConfig = dataWriteConfig; + } + + @Override + public EngineType getEngineType() { + return EngineType.SPARK; + } + + @Override + public HoodieData<HoodieRecord> buildInitialization( + List<FileInfoAndPartition> filesToIndex, + HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, + int parallelism, HoodieSchema tableSchema, + HoodieSchema readerSchema, + StorageConfiguration<?> storageConf, + String instantTime) { + if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + throw new HoodieNotSupportedException("Hudi tables prior to version 8 do not support expression index."); + } + HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata expressionIndexComputationMetadata = SparkMetadataWriterUtils.getExprIndexRecords( + filesToIndex, indexDefinition, metaClient, parallelism, tableSchema, readerSchema, instantTime, engineContext, dataWriteConfig, + Option.of(rangeMetadata -> + HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata, true, Option.of(indexDefinition.getIndexName())))); + HoodieData<HoodieRecord> exprIndexRecords = expressionIndexComputationMetadata.getExpressionIndexRecords(); + if (indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS)) { + exprIndexRecords = exprIndexRecords.union(expressionIndexComputationMetadata.getPartitionStatRecordsOpt().get()); + } + return exprIndexRecords; + } + + @Override + public HoodieData<HoodieRecord> buildUpdate( + HoodieTableMetaClient dataMetaClient, + HoodieTableMetadata tableMetadata, + HoodieCommitMetadata commitMetadata, + String indexPartition, + String instantTime) { + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, dataMetaClient); + boolean isExprIndexUsingColumnStats = indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS); + Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt = Option.empty(); + if (isExprIndexUsingColumnStats) { + // Fetch column range metadata for affected partitions in the commit + HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> exprIndexPartitionStatUpdates = + SparkMetadataWriterUtils.getExpressionIndexPartitionStatsForExistingFiles( + commitMetadata, indexPartition, engineContext, tableMetadata, dataMetaClient, dataWriteConfig.getMetadataConfig(), + Option.of(dataWriteConfig.getRecordMerger().getRecordType()), instantTime, dataWriteConfig) + .flatMapValues(List::iterator); + // The function below merges the column range metadata from the updated data with latest column range metadata of affected partition computed above + partitionRecordsFunctionOpt = Option.of(rangeMetadata -> + HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata), true, Option.of(indexDefinition.getIndexName()))); + } + + // Step 1: Generate partition name, file path and size triplets from the newly created files in the commit metadata + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> filesToIndex.add( + FileInfoAndPartition.of(writeStat.getPartitionPath(), new StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileSizeInBytes())))); + int parallelism = Math.min(filesToIndex.size(), dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + HoodieSchema tableSchema = null; + try { + tableSchema = new TableSchemaResolver(dataMetaClient).getTableSchema(); + } catch (Exception e) { + throw new HoodieSchemaNotFoundException("No schema found for table at " + dataMetaClient.getBasePath()); Review Comment: 🤖 This catch swallows the original exception cause and converts any `Exception` (IO, parse error, network, etc.) into a `HoodieSchemaNotFoundException`, which could be misleading. Could you pass `e` as the cause (`new HoodieSchemaNotFoundException(msg, e)`) and consider narrowing the catch? The previous implementation propagated the underlying exception, so this is a debuggability regression. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.secondary; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; + +/** + * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index + */ +@Slf4j +public class SecondaryIndexer extends BaseIndexer { + + public SecondaryIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (secondaryIndexPartitionsToInit.size() != 1) { + if (secondaryIndexPartitionsToInit.size() > 1) { + log.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = secondaryIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + + int parallelism = Math.min(partitionFileSlicePairs.size(), dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism()); + HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices( + engineContext, + partitionFileSlicePairs, + parallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + indexDefinition, + dataTableWriteConfig.getProps()); + + // Initialize the file groups - using the same estimation logic as that of record index + final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records::count, + RECORD_INDEX_AVERAGE_RECORD_SIZE, dataTableWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount(), + dataTableWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount(), dataTableWriteConfig.getRecordIndexGrowthFactor(), + dataTableWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); + + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate( + String instantTime, + HoodieBackedTableMetadata tableMetadata, + Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { + if (!SECONDARY_INDEX.isMetadataPartitionAvailable(dataTableMetaClient)) { + return Collections.emptyList(); + } + // If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update, + // because these operations do not change the secondary key - record key mapping. + WriteOperationType operationType = commitMetadata.getOperationType(); + if (operationType.isInsertOverwriteOrDeletePartition()) { + throw new HoodieIndexException(String.format("Can not perform operation %s on secondary index", operationType)); + } else if (operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.CLUSTER) { + return Collections.emptyList(); + } + + return dataTableMetaClient.getTableConfig().getMetadataPartitions() + .stream() + .filter(partition -> partition.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX)) + .map(partition -> { + HoodieData<HoodieRecord> secondaryIndexRecords; + try { + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, instantTime); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); + } + return IndexPartitionAndRecords.of(partition, secondaryIndexRecords); + }).collect(Collectors.toList()); + } + + @Override + public List<IndexPartitionAndRecords> buildClean(String instantTime, HoodieCleanMetadata cleanMetadata) { + return Collections.emptyList(); + } + + private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) { + List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + // Return early if there are no write stats. Review Comment: 🤖 nit: the comment only mentions the empty-write-stats case, but the `if` on the next line also guards against `isCompactionOrClustering` — which `buildUpdate` already screens for before reaching this method. Could you either drop the redundant compaction check or update the comment to explain why this extra guard exists? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/UnsupportedExpressionIndexRecordGenerator.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.storage.StorageConfiguration; + +import java.util.List; + +/** + * Fallback {@link ExpressionIndexRecordGenerator} that throws a not-supported exception + * when expression index bootstrap is requested for unsupported engines. + */ +public class UnsupportedExpressionIndexRecordGenerator implements ExpressionIndexRecordGenerator { + + private final EngineType engineType; + + public UnsupportedExpressionIndexRecordGenerator(EngineType engineType) { + this.engineType = engineType; + } + + @Override + public EngineType getEngineType() { + return engineType; + } + + @Override + public HoodieData<HoodieRecord> buildInitialization( + List<FileInfoAndPartition> filesToIndex, + HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, + int parallelism, + HoodieSchema tableSchema, + HoodieSchema readerSchema, + StorageConfiguration<?> storageConf, + String instantTime) { + if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + throw new HoodieNotSupportedException("Table version 7 and below does not support expression index"); + } + throw new HoodieNotSupportedException(engineType + " engine does not support building expression index yet"); + } + + @Override + public HoodieData<HoodieRecord> buildUpdate( + HoodieTableMetaClient metaClient, + HoodieTableMetadata tableMetadata, + HoodieCommitMetadata commitMetadata, + String indexPartition, + String instantTime) { + if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + throw new HoodieNotSupportedException("Table version 6 and below does not support expression index"); Review Comment: 🤖 nit: the error message says "Table version 6 and below" but the guard is `lesserThan(HoodieTableVersion.EIGHT)`, which covers versions ≤ 7 — the message in `buildInitialization` above correctly says "7 and below". Could you align this one to say "Table version 7 and below does not support expression index"? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.record.HoodieRecordIndex; +import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; + +/** + * Implementation of the partitioned {@link MetadataPartitionType#RECORD_INDEX} index + */ +@Slf4j +public class PartitionedRecordIndexer extends BaseRecordIndexer { + public PartitionedRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + createRecordIndexDefinition(dataTableMetaClient, Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true")); + Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = lazyPartitionFileSlices.get().stream() + .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath)); + Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new HashMap<>(partitionFileSlicePairsMap.size()); Review Comment: 🤖 nit: `fileGroupCountAndRecordsPairMap` is quite a mouthful — it describes the value type (`DataPartitionAndRecords`) rather than the variable's role. Something like `partitionDataMap` would be more concise and equally clear here. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * Data class representing a file and partition with a flag. + */ +@Getter +@AllArgsConstructor +@Accessors(fluent = true) +public class FileAndPartitionFlag implements Serializable { + private static final long serialVersionUID = 1L; Review Comment: 🤖 nit: the `Flag` suffix in `FileAndPartitionFlag` is pretty opaque — a reader seeing `List<FileAndPartitionFlag>` in a method signature gets no signal about what the flag represents. Something like `PartitionFileEntry` (with the deletion state carried by `isDeleted`) or `FileWithDeleteStatus` would make the intent clearer at the call site. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieIndexVersion; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath; +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnsToIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private final Lazy<List<String>> columnsToIndex; + + public ColumnStatsIndexer(HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.columnsToIndex = Lazy.lazily(() -> + new ArrayList<>(getColumnsToIndex( + dataTableMetaClient.getTableConfig(), + dataTableWriteConfig.getMetadataConfig(), + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)), + true, + Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()), + existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)).keySet())); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + final int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + // columnsToIndex can be empty if meta fields are disabled and cols to index is not explicitly overridden. + if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } + + log.info("Indexing {} columns for column stats index", columnsToIndex.get().size()); + // during initialization, we need stats for base and log files. + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToAllFilesMap, + dataTableMetaClient, dataTableWriteConfig.getColumnStatsIndexParallelism(), + dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), + columnsToIndex.get()); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate( + String instantTime, + HoodieBackedTableMetadata tableMetadata, + Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { + final HoodieData<HoodieRecord> records = convertMetadataToColumnStatsRecords(commitMetadata, engineContext, + dataTableMetaClient, dataTableWriteConfig.getMetadataConfig(), Option.of(dataTableWriteConfig.getRecordMerger().getRecordType())); + return Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildClean(String instantTime, HoodieCleanMetadata cleanMetadata) { + final HoodieData<HoodieRecord> records = + convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, dataTableMetaClient, + dataTableWriteConfig.getMetadataConfig(), Option.of(dataTableWriteConfig.getRecordMerger().getRecordType())); + return Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(), records)); + } + + @Override + public List<IndexPartitionAndRecords> buildRestore(String instantTime, List<String> deletedPartitions, Map<String, List<FileInfo>> filesAdded, Map<String, List<String>> filesDeleted) { + if (filesDeleted.isEmpty() && filesAdded.isEmpty()) { + return Collections.emptyList(); + } + Lazy<Option<HoodieSchema>> tableSchema = + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)); + final List<String> columnsToIndex = new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex( Review Comment: 🤖 nit: the local `columnsToIndex` shadows the class field `this.columnsToIndex` (a `Lazy<List<String>>`) but is computed with a different `includeMetaFields=false` flag — could you rename it to something like `restoreColumnsToIndex` to make the distinction clear? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer; +import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer; +import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyPartitionFileSlices.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (filesToIndex.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + + int parallelism = Math.min(filesToIndex.size(), dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + HoodieSchema tableSchema = + HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient) + .orElseThrow(() -> new HoodieMetadataException("Table schema is not available for expression index initialization")); + HoodieSchema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, tableSchema); + + HoodieData<HoodieRecord> records = expressionIndexRecordGenerator.buildInitialization( + filesToIndex, indexDefinition, dataTableMetaClient, parallelism, + tableSchema, readerSchema, engineContext.getStorageConf(), dataTableInstantTime); + + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, records)); + } + + @Override + public List<IndexPartitionAndRecords> buildUpdate(String instantTime, HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> lazyFileSystemView, + HoodieCommitMetadata commitMetadata) { + if (!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataTableMetaClient)) { + log.info("Don't need to update expression index, since no expression index is available"); + return Collections.emptyList(); + } + return dataTableMetaClient.getTableConfig().getMetadataPartitions() + .stream() + .filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) + .map(partition -> { + HoodieData<HoodieRecord> expressionIndexRecords; + try { + expressionIndexRecords = expressionIndexRecordGenerator.buildUpdate(dataTableMetaClient, tableMetadata, commitMetadata, partition, instantTime); + } catch (Exception e) { + throw new HoodieMetadataException(String.format("Failed to get expression index updates for partition %s", partition), e); + } + return IndexPartitionAndRecords.of(partition, expressionIndexRecords); + }).collect(Collectors.toList()); + } + + @Override + public List<IndexPartitionAndRecords> buildClean(String instantTime, HoodieCleanMetadata cleanMetadata) { + Option<HoodieIndexMetadata> indexMetadata = dataTableMetaClient.getIndexMetadata(); + if (indexMetadata.isEmpty()) { + throw new HoodieMetadataException("Expression index metadata not found"); + } + List<IndexPartitionAndRecords> indexRecordsList = new ArrayList<>(); + HoodieIndexMetadata metadata = indexMetadata.get(); + Map<String, HoodieIndexDefinition> indexDefinitions = metadata.getIndexDefinitions(); + if (indexDefinitions.isEmpty()) { + throw new HoodieMetadataException("Expression index metadata not found"); Review Comment: 🤖 nit: this throws the same message `"Expression index metadata not found"` as the check six lines above, but for a different condition (definitions map is empty vs. metadata object absent) — something like `"Expression index has no index definitions"` would make it easier to pinpoint which check failed. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
