hudi-agent commented on code in PR #18372:
URL: https://github.com/apache/hudi/pull/18372#discussion_r3308175358


##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileInfoAndPartition.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+/**
+ * Immutable descriptor for a file that should be scanned during index 
bootstrap.
+ */
+@Getter
+@Accessors(fluent = true)
+public class FileInfoAndPartition implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final String partitionPath;
+  private final String path;
+  private final long size;

Review Comment:
   🤖 nit: could you rename `path` to `fileName` here? The sibling class 
`FileInfo` in the same package already uses `fileName` for the same concept (a 
file name within a partition, not a full path), and in practice this field is 
populated from `new StoragePath(path).getName()`. Calling it `path` while it 
holds only a name could mislead a future reader.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/index/SparkExpressionIndexRecordGenerator.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.exception.HoodieSchemaNotFoundException;
+import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+
+/**
+ * Spark implementation of {@link ExpressionIndexRecordGenerator}.
+ */
+public class SparkExpressionIndexRecordGenerator implements 
ExpressionIndexRecordGenerator {
+  private final HoodieEngineContext engineContext;
+  private final HoodieWriteConfig dataWriteConfig;
+
+  public SparkExpressionIndexRecordGenerator(HoodieEngineContext engineContext,
+                                             HoodieWriteConfig 
dataWriteConfig) {
+    this.engineContext = engineContext;
+    this.dataWriteConfig = dataWriteConfig;
+  }
+
+  @Override
+  public EngineType getEngineType() {
+    return EngineType.SPARK;
+  }
+
+  @Override
+  public HoodieData<HoodieRecord> buildInitialization(
+      List<FileInfoAndPartition> filesToIndex,
+      HoodieIndexDefinition indexDefinition,
+      HoodieTableMetaClient metaClient,
+      int parallelism, HoodieSchema tableSchema,
+      HoodieSchema readerSchema,
+      StorageConfiguration<?> storageConf,
+      String instantTime) {
+    if 
(metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT))
 {
+      throw new HoodieNotSupportedException("Hudi tables prior to version 8 do 
not support expression index.");
+    }
+    HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata 
expressionIndexComputationMetadata = 
SparkMetadataWriterUtils.getExprIndexRecords(
+        filesToIndex, indexDefinition, metaClient, parallelism, tableSchema, 
readerSchema, instantTime, engineContext, dataWriteConfig,
+        Option.of(rangeMetadata ->
+            
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata,
 true, Option.of(indexDefinition.getIndexName()))));
+    HoodieData<HoodieRecord> exprIndexRecords = 
expressionIndexComputationMetadata.getExpressionIndexRecords();
+    if (indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS)) {
+      exprIndexRecords = 
exprIndexRecords.union(expressionIndexComputationMetadata.getPartitionStatRecordsOpt().get());
+    }
+    return exprIndexRecords;
+  }
+
+  @Override
+  public HoodieData<HoodieRecord> buildUpdate(
+      HoodieTableMetaClient dataMetaClient,
+      HoodieTableMetadata tableMetadata,
+      HoodieCommitMetadata commitMetadata,
+      String indexPartition,
+      String instantTime) {
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, 
dataMetaClient);
+    boolean isExprIndexUsingColumnStats = 
indexDefinition.getIndexType().equals(PARTITION_NAME_COLUMN_STATS);
+    Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt = Option.empty();
+    if (isExprIndexUsingColumnStats) {
+      // Fetch column range metadata for affected partitions in the commit
+      HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>> 
exprIndexPartitionStatUpdates =
+          
SparkMetadataWriterUtils.getExpressionIndexPartitionStatsForExistingFiles(
+                  commitMetadata, indexPartition, engineContext, 
tableMetadata, dataMetaClient, dataWriteConfig.getMetadataConfig(),
+                  
Option.of(dataWriteConfig.getRecordMerger().getRecordType()), instantTime, 
dataWriteConfig)
+              .flatMapValues(List::iterator);
+      // The function below merges the column range metadata from the updated 
data with latest column range metadata of affected partition computed above
+      partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
+          
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(exprIndexPartitionStatUpdates.union(rangeMetadata),
 true, Option.of(indexDefinition.getIndexName())));
+    }
+
+    // Step 1: Generate partition name, file path and size triplets from the 
newly created files in the commit metadata
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> writeStats.forEach(writeStat -> filesToIndex.add(
+        FileInfoAndPartition.of(writeStat.getPartitionPath(), new 
StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), 
writeStat.getFileSizeInBytes()))));
+    int parallelism = Math.min(filesToIndex.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    HoodieSchema tableSchema = null;
+    try {
+      tableSchema = new TableSchemaResolver(dataMetaClient).getTableSchema();
+    } catch (Exception e) {
+      throw new HoodieSchemaNotFoundException("No schema found for table at " 
+ dataMetaClient.getBasePath());

Review Comment:
   🤖 This catch swallows the original exception cause and converts any 
`Exception` (IO, parse error, network, etc.) into a 
`HoodieSchemaNotFoundException`, which could be misleading. Could you pass `e` 
as the cause (`new HoodieSchemaNotFoundException(msg, e)`) and consider 
narrowing the catch? The previous implementation propagated the underlying 
exception, so this is a debuggability regression.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.secondary;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
+
+/**
+ * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index
+ */
+@Slf4j
+public class SecondaryIndexer extends BaseIndexer {
+
+  public SecondaryIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException {
+    Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, 
dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient);
+    if (secondaryIndexPartitionsToInit.size() != 1) {
+      if (secondaryIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = secondaryIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Secondary Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        indexDefinition,
+        dataTableWriteConfig.getProps());
+
+    // Initialize the file groups - using the same estimation logic as that of 
record index
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records::count,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataTableWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount(),
+        dataTableWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount(), 
dataTableWriteConfig.getRecordIndexGrowthFactor(),
+        dataTableWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(
+      String instantTime,
+      HoodieBackedTableMetadata tableMetadata,
+      Lazy<HoodieTableFileSystemView> lazyFileSystemView,
+      HoodieCommitMetadata commitMetadata) {
+    if (!SECONDARY_INDEX.isMetadataPartitionAvailable(dataTableMetaClient)) {
+      return Collections.emptyList();
+    }
+    // If write operation type based on commit metadata is COMPACT or CLUSTER 
then no need to update,
+    // because these operations do not change the secondary key - record key 
mapping.
+    WriteOperationType operationType = commitMetadata.getOperationType();
+    if (operationType.isInsertOverwriteOrDeletePartition()) {
+      throw new HoodieIndexException(String.format("Can not perform operation 
%s on secondary index", operationType));
+    } else if (operationType == WriteOperationType.COMPACT || operationType == 
WriteOperationType.CLUSTER) {
+      return Collections.emptyList();
+    }
+
+    return dataTableMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .map(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, instantTime);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          return IndexPartitionAndRecords.of(partition, secondaryIndexRecords);
+        }).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildClean(String instantTime, 
HoodieCleanMetadata cleanMetadata) {
+    return Collections.emptyList();
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) {
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+    // Return early if there are no write stats.

Review Comment:
   🤖 nit: the comment only mentions the empty-write-stats case, but the `if` on 
the next line also guards against `isCompactionOrClustering` — which 
`buildUpdate` already screens for before reaching this method. Could you either 
drop the redundant compaction check or update the comment to explain why this 
extra guard exists?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/UnsupportedExpressionIndexRecordGenerator.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import java.util.List;
+
+/**
+ * Fallback {@link ExpressionIndexRecordGenerator} that throws a not-supported 
exception
+ * when expression index bootstrap is requested for unsupported engines.
+ */
+public class UnsupportedExpressionIndexRecordGenerator implements 
ExpressionIndexRecordGenerator {
+
+  private final EngineType engineType;
+
+  public UnsupportedExpressionIndexRecordGenerator(EngineType engineType) {
+    this.engineType = engineType;
+  }
+
+  @Override
+  public EngineType getEngineType() {
+    return engineType;
+  }
+
+  @Override
+  public HoodieData<HoodieRecord> buildInitialization(
+      List<FileInfoAndPartition> filesToIndex,
+      HoodieIndexDefinition indexDefinition,
+      HoodieTableMetaClient metaClient,
+      int parallelism,
+      HoodieSchema tableSchema,
+      HoodieSchema readerSchema,
+      StorageConfiguration<?> storageConf,
+      String instantTime) {
+    if 
(metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT))
 {
+      throw new HoodieNotSupportedException("Table version 7 and below does 
not support expression index");
+    }
+    throw new HoodieNotSupportedException(engineType + " engine does not 
support building expression index yet");
+  }
+
+  @Override
+  public HoodieData<HoodieRecord> buildUpdate(
+      HoodieTableMetaClient metaClient,
+      HoodieTableMetadata tableMetadata,
+      HoodieCommitMetadata commitMetadata,
+      String indexPartition,
+      String instantTime) {
+    if 
(metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT))
 {
+      throw new HoodieNotSupportedException("Table version 6 and below does 
not support expression index");

Review Comment:
   🤖 nit: the error message says "Table version 6 and below" but the guard is 
`lesserThan(HoodieTableVersion.EIGHT)`, which covers versions ≤ 7 — the message 
in `buildInitialization` above correctly says "7 and below". Could you align 
this one to say "Table version 7 and below does not support expression index"?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * Implementation of the partitioned {@link 
MetadataPartitionType#RECORD_INDEX} index
+ */
+@Slf4j
+public class PartitionedRecordIndexer extends BaseRecordIndexer {
+  public PartitionedRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig,
+                                     HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException {
+    createRecordIndexDefinition(dataTableMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true"));
+    Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = 
lazyPartitionFileSlices.get().stream()
+        .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath));
+    Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new 
HashMap<>(partitionFileSlicePairsMap.size());

Review Comment:
   🤖 nit: `fileGroupCountAndRecordsPairMap` is quite a mouthful — it describes 
the value type (`DataPartitionAndRecords`) rather than the variable's role. 
Something like `partitionDataMap` would be more concise and equally clear here.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+/**
+ * Data class representing a file and partition with a flag.
+ */
+@Getter
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class FileAndPartitionFlag implements Serializable {
+  private static final long serialVersionUID = 1L;

Review Comment:
   🤖 nit: the `Flag` suffix in `FileAndPartitionFlag` is pretty opaque — a 
reader seeing `List<FileAndPartitionFlag>` in a method signature gets no signal 
about what the flag represents. Something like `PartitionFileEntry` (with the 
deletion state carried by `isDeleted`) or `FileWithDeleteStatus` would make the 
intent clearer at the call site.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnsToIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private final Lazy<List<String>> columnsToIndex;
+
+  public ColumnStatsIndexer(HoodieEngineContext engineContext,
+                               HoodieWriteConfig dataTableWriteConfig,
+                               HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.columnsToIndex = Lazy.lazily(() ->
+        new ArrayList<>(getColumnsToIndex(
+            dataTableMetaClient.getTableConfig(),
+            dataTableWriteConfig.getMetadataConfig(),
+            Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)),
+            true,
+            Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()),
+            existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient)).keySet()));
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    final int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    // columnsToIndex can be empty if meta fields are disabled and cols to 
index is not explicitly overridden.
+    if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }
+
+    log.info("Indexing {} columns for column stats index", 
columnsToIndex.get().size());
+    // during initialization, we need stats for base and log files.
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+        engineContext, Collections.emptyMap(), partitionToAllFilesMap,
+        dataTableMetaClient, 
dataTableWriteConfig.getColumnStatsIndexParallelism(),
+        dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
+        columnsToIndex.get());
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(
+      String instantTime,
+      HoodieBackedTableMetadata tableMetadata,
+      Lazy<HoodieTableFileSystemView> lazyFileSystemView,
+      HoodieCommitMetadata commitMetadata) {
+    final HoodieData<HoodieRecord> records = 
convertMetadataToColumnStatsRecords(commitMetadata, engineContext,
+        dataTableMetaClient, dataTableWriteConfig.getMetadataConfig(), 
Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()));
+    return 
Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(),
 records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildClean(String instantTime, 
HoodieCleanMetadata cleanMetadata) {
+    final HoodieData<HoodieRecord> records =
+        convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, 
dataTableMetaClient,
+            dataTableWriteConfig.getMetadataConfig(), 
Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()));
+    return 
Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(),
 records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildRestore(String instantTime, 
List<String> deletedPartitions, Map<String, List<FileInfo>> filesAdded, 
Map<String, List<String>> filesDeleted) {
+    if (filesDeleted.isEmpty() && filesAdded.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Lazy<Option<HoodieSchema>> tableSchema =
+        Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient));
+    final List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(

Review Comment:
   🤖 nit: the local `columnsToIndex` shadows the class field 
`this.columnsToIndex` (a `Lazy<List<String>>`) but is computed with a different 
`includeMetaFields=false` flag — could you rename it to something like 
`restoreColumnsToIndex` to make the distinction clear?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer;
+import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (filesToIndex.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+
+    int parallelism = Math.min(filesToIndex.size(), 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    HoodieSchema tableSchema =
+        HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)
+            .orElseThrow(() -> new HoodieMetadataException("Table schema is 
not available for expression index initialization"));
+    HoodieSchema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, 
tableSchema);
+
+    HoodieData<HoodieRecord> records = 
expressionIndexRecordGenerator.buildInitialization(
+        filesToIndex, indexDefinition, dataTableMetaClient, parallelism,
+        tableSchema, readerSchema, engineContext.getStorageConf(), 
dataTableInstantTime);
+
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(String instantTime, 
HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> 
lazyFileSystemView,
+                                                    HoodieCommitMetadata 
commitMetadata) {
+    if 
(!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataTableMetaClient))
 {
+      log.info("Don't need to update expression index, since no expression 
index is available");
+      return Collections.emptyList();
+    }
+    return dataTableMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX))
+        .map(partition -> {
+          HoodieData<HoodieRecord> expressionIndexRecords;
+          try {
+            expressionIndexRecords = 
expressionIndexRecordGenerator.buildUpdate(dataTableMetaClient, tableMetadata, 
commitMetadata, partition, instantTime);
+          } catch (Exception e) {
+            throw new HoodieMetadataException(String.format("Failed to get 
expression index updates for partition %s", partition), e);
+          }
+          return IndexPartitionAndRecords.of(partition, 
expressionIndexRecords);
+        }).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildClean(String instantTime, 
HoodieCleanMetadata cleanMetadata) {
+    Option<HoodieIndexMetadata> indexMetadata = 
dataTableMetaClient.getIndexMetadata();
+    if (indexMetadata.isEmpty()) {
+      throw new HoodieMetadataException("Expression index metadata not found");
+    }
+    List<IndexPartitionAndRecords> indexRecordsList = new ArrayList<>();
+    HoodieIndexMetadata metadata = indexMetadata.get();
+    Map<String, HoodieIndexDefinition> indexDefinitions = 
metadata.getIndexDefinitions();
+    if (indexDefinitions.isEmpty()) {
+      throw new HoodieMetadataException("Expression index metadata not found");

Review Comment:
   🤖 nit: this throws the same message `"Expression index metadata not found"` 
as the check six lines above, but for a different condition (definitions map is 
empty vs. metadata object absent) — something like `"Expression index has no 
index definitions"` would make it easier to pinpoint which check failed.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to