hudi-agent commented on code in PR #18372:
URL: https://github.com/apache/hudi/pull/18372#discussion_r3295885668


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer;
+import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (filesToIndex.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+
+    int parallelism = Math.min(filesToIndex.size(), 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    HoodieSchema tableSchema =
+        HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)
+            .orElseThrow(() -> new HoodieMetadataException("Table schema is 
not available for expression index initialization"));
+    HoodieSchema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, 
tableSchema);
+
+    HoodieData<HoodieRecord> records = 
expressionIndexRecordGenerator.buildInitialization(
+        filesToIndex, indexDefinition, dataTableMetaClient, parallelism,
+        tableSchema, readerSchema, engineContext.getStorageConf(), 
dataTableInstantTime);
+
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(String instantTime, 
HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> 
lazyFileSystemView,
+                                                    HoodieCommitMetadata 
commitMetadata) {

Review Comment:
   🤖 This throws when `dataTableMetaClient.getIndexMetadata()` is empty, but 
`ExpressionIndexer` is added to `enabledIndexerMap` whenever 
`metadataConfig.isExpressionIndexEnabled()` is true — even before any 
expression index has been bootstrapped. The caller comment in 
`update(HoodieCleanMetadata)` says "for index partitions not needed to handle, 
`buildClean` will return an empty list", which `buildUpdate` honors but this 
path violates. Could you mirror `buildUpdate`'s `isMetadataPartitionAvailable` 
check and return `Collections.emptyList()` instead, so cleans don't fail when 
the partition isn't bootstrapped yet?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1592,102 +1099,15 @@ public BatchMetadataConversionFunction(String 
instantTime, HoodieCommitMetadata
     }
 
     @Override
-    public Map<String, HoodieData<HoodieRecord>> convertMetadata() {
-      Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
-          HoodieMetadataWriteUtils.convertMetadataToRecords(
-              engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient, getTableMetadata(),
-              dataWriteConfig.getMetadataConfig(),
-              partitionsToUpdate, dataWriteConfig.getBloomFilterType(),
-              dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
-              Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
-
-      // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
-      // to the HoodieTableMetadataUtil class in hudi-common.
-      if (partitionsToUpdate.contains(RECORD_INDEX.getPartitionPath())) {
-        HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()),
 commitMetadata);
-        partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
-      }
-      if (partitionsToUpdate.stream().anyMatch(partition -> 
partition.startsWith(EXPRESSION_INDEX.getPartitionPath()))) {
-        updateExpressionIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
-      }
-      if (partitionsToUpdate.stream().anyMatch(partition -> 
partition.startsWith(SECONDARY_INDEX.getPartitionPath()))) {
-        updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
instantTime);
-      }
-      return partitionToRecordMap;
-    }
-  }
-
-  /**
-   * Update expression index from {@link HoodieCommitMetadata}.
-   */
-  private void updateExpressionIndexIfPresent(HoodieCommitMetadata 
commitMetadata, String instantTime,
-                                              Map<String, 
HoodieData<HoodieRecord>> partitionToRecordMap) {
-    if 
(!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataMetaClient))
 {
-      return;
+    public List<IndexPartitionAndRecords> convertMetadata() {
+      return partitionsToUpdate.stream().flatMap(indexPartition -> {

Review Comment:
   🤖 This `partitionsToUpdate.stream().flatMap(...)` will invoke `buildUpdate` 
once per partition path, but `ExpressionIndexer.buildUpdate` / 
`SecondaryIndexer.buildUpdate` each iterate over ALL their respective metadata 
partitions and return records for every one. With N expression (or secondary) 
index partitions, you'll get N×N `IndexPartitionAndRecords` entries with the 
same `indexPartitionName`, all of which are then union'd in 
`tagRecordsWithLocation`. Could you dedupe by `MetadataPartitionType` first 
(e.g. 
`partitionsToUpdate.stream().map(MetadataPartitionType::fromPartitionPath).distinct()`)
 so each indexer's `buildUpdate` is called once?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+/**
+ * Data class representing a file and partition with a flag.
+ */
+@Getter
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class FileAndPartitionFlag implements Serializable {
+  private static final long serialVersionUID = 1L;

Review Comment:
   🤖 nit: the `Flag` suffix is pretty opaque — without looking at the fields 
you can't tell this is about deletion tracking. Something like 
`PartitionedFileWithDeleteStatus` or just `FileDeleteEntry` might make the 
intent obvious at call sites.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.secondary;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
+
+/**
+ * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index
+ */
+@Slf4j
+public class SecondaryIndexer extends BaseIndexer {
+
+  public SecondaryIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException {
+    Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, 
dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient);
+    if (secondaryIndexPartitionsToInit.size() != 1) {
+      if (secondaryIndexPartitionsToInit.size() > 1) {

Review Comment:
   🤖 nit: the outer `size() != 1` check with a nested `size() > 1` warning-only 
block makes the two cases (empty vs. too many) harder to see at a glance. Could 
you separate them into two explicit guards — `if (size() > 1) { log.warn; 
return empty; }` followed by `if (isEmpty()) { return empty; }` — so each case 
is self-contained?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/partitionstats/TestPartitionStatsIndexer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.
+ */
+
+package org.apache.hudi.metadata.index.partitionstats;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class TestPartitionStatsIndexer {
+
+  @Test
+  void testSkipForNonPartitionedTable() throws IOException {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.isTablePartitioned()).thenReturn(false);
+
+    PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, 
writeConfig, metaClient);
+    List<IndexPartitionInitialization> result = 
indexer.buildInitialization("001", "002", Collections.emptyMap(), 
Lazy.lazily(Collections::emptyList));
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void testSkipWhenColumnStatsDisabled() throws IOException {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.isTablePartitioned()).thenReturn(true);
+    when(writeConfig.isMetadataColumnStatsIndexEnabled()).thenReturn(false);
+
+    PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, 
writeConfig, metaClient);
+    List<IndexPartitionInitialization> result = 
indexer.buildInitialization("001", "002", Collections.emptyMap(), 
Lazy.lazily(Collections::emptyList));
+    assertTrue(result.isEmpty());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  void testInitializeWithRealEngineContextAndIndexDataContent() throws 
IOException {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.isTablePartitioned()).thenReturn(true);
+    when(writeConfig.isMetadataColumnStatsIndexEnabled()).thenReturn(true);
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(writeConfig.getRecordMerger()).thenReturn(recordMerger);
+    
when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO);
+    when(metadataConfig.getPartitionStatsIndexFileGroupCount()).thenReturn(4);
+
+    HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_part",
+            Collections.singletonMap("f_part.parquet", 33L), 
Collections.emptyList())),
+        1);
+
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(any(), any(), 
any(), any(), any(), any()))
+          .thenReturn(records);
+
+      PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, 
writeConfig, metaClient);
+      List<IndexPartitionInitialization> initializationList = 
indexer.buildInitialization("001", "002", Collections.emptyMap(), 
Lazy.lazily(Collections::emptyList));
+      assertEquals(1, initializationList.size());
+
+      assertEquals(4, initializationList.get(0).totalFileGroups());
+      List<HoodieRecord> collected = 
initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList();
+      assertEquals(1, collected.size());
+      assertEquals("p_part", collected.get(0).getRecordKey());
+    }
+  }
+
+  @Test
+  void testBuildUpdateThrowsWhenColumnStatsPartitionNotAvailable() {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    
when(tableConfig.isMetadataPartitionAvailable(any(MetadataPartitionType.class))).thenReturn(false);
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(writeConfig.getRecordMerger()).thenReturn(recordMerger);
+    
when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO);
+
+    PartitionStatsIndexer indexer = new PartitionStatsIndexer(engineContext, 
writeConfig, metaClient);
+    assertThrows(IllegalStateException.class, () -> indexer.buildUpdate(
+        "010",
+        mock(HoodieBackedTableMetadata.class),
+        Lazy.lazily(() -> mock(HoodieTableFileSystemView.class)),
+        new HoodieCommitMetadata()));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  void testBuildUpdateWithNonEmptyCommitMetadataProducesPartitionEntry() {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieEngineContext realEngineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());

Review Comment:
   🤖 nit: both `engineContext` and `realEngineContext` are 
`HoodieLocalEngineContext` instances here, so the "real" prefix is misleading — 
it implies the first one is not real. In the similar test in 
`TestColumnStatsIndexer` the distinction is `mock(HoodieEngineContext.class)` 
vs `HoodieLocalEngineContext`, which makes the naming earn its keep. Could you 
rename this to something like `testDataEngineContext` or 
`parallelizeEngineContext` to clarify its role?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1211,35 +725,32 @@ private static List<HoodieRecord> 
convertMetadataToRollbackRecords(HoodieRollbac
    * @param partitionToAppendedFiles The {@code Map} to fill with files 
appended per partition and their sizes.
    */
   private static void processRollbackMetadata(HoodieRollbackMetadata 
rollbackMetadata,
-                                              Map<String, Map<String, Long>> 
partitionToAppendedFiles) {
+                                              Map<String, 
List<FileInfoAndPartition>> partitionToAppendedFiles) {
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
       // Has this rollback produced new files?
       boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty();
       final String partition = pm.getPartitionPath();
       final String partitionId = 
getPartitionIdentifierForFilesPartition(partition);
 
-      BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
-        // if a file exists in both written log files and rollback log files, 
we want to pick the one that is higher
-        // as rollback file could have been updated after written log files 
are computed.
-        return oldSize > newSizeCopy ? oldSize : newSizeCopy;
-      };
-
       if (hasRollbackLogFiles) {

Review Comment:
   🤖 nit: this immediately converts the existing `List<FileInfoAndPartition>` 
to a `Map<String, Long>` for merging, then converts the result back to a `List` 
at line 749. The round-trip is surprising to a reader and only arises because 
the accumulation type was changed from `Map<String, Long>` to 
`List<FileInfoAndPartition>`. Could you keep a local `Map<String, Map<String, 
Long>>` as the accumulation structure inside `processRollbackMetadata` and do 
the conversion to `List<FileInfoAndPartition>` only once, at the point where 
the result is placed back into `partitionToAppendedFiles`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer;
+import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (filesToIndex.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+
+    int parallelism = Math.min(filesToIndex.size(), 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    HoodieSchema tableSchema =
+        HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)
+            .orElseThrow(() -> new HoodieMetadataException("Table schema is 
not available for expression index initialization"));
+    HoodieSchema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataTableMetaClient, 
tableSchema);
+
+    HoodieData<HoodieRecord> records = 
expressionIndexRecordGenerator.buildInitialization(
+        filesToIndex, indexDefinition, dataTableMetaClient, parallelism,
+        tableSchema, readerSchema, engineContext.getStorageConf(), 
dataTableInstantTime);
+
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(String instantTime, 
HoodieBackedTableMetadata tableMetadata, Lazy<HoodieTableFileSystemView> 
lazyFileSystemView,
+                                                    HoodieCommitMetadata 
commitMetadata) {
+    if 
(!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(dataTableMetaClient))
 {
+      log.info("Don't need to update expression index, since no expression 
index is available");
+      return Collections.emptyList();
+    }
+    return dataTableMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX))
+        .map(partition -> {
+          HoodieData<HoodieRecord> expressionIndexRecords;
+          try {
+            expressionIndexRecords = 
expressionIndexRecordGenerator.buildUpdate(dataTableMetaClient, tableMetadata, 
commitMetadata, partition, instantTime);
+          } catch (Exception e) {
+            throw new HoodieMetadataException(String.format("Failed to get 
expression index updates for partition %s", partition), e);
+          }
+          return IndexPartitionAndRecords.of(partition, 
expressionIndexRecords);
+        }).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildClean(String instantTime, 
HoodieCleanMetadata cleanMetadata) {
+    Option<HoodieIndexMetadata> indexMetadata = 
dataTableMetaClient.getIndexMetadata();
+    if (indexMetadata.isEmpty()) {
+      throw new HoodieMetadataException("Expression index metadata not found");
+    }
+    List<IndexPartitionAndRecords> indexRecordsList = new ArrayList<>();
+    HoodieIndexMetadata metadata = indexMetadata.get();
+    Map<String, HoodieIndexDefinition> indexDefinitions = 
metadata.getIndexDefinitions();
+    if (indexDefinitions.isEmpty()) {
+      throw new HoodieMetadataException("Expression index metadata not found");
+    }

Review Comment:
   🤖 nit: this message is identical to the one thrown at line 161, so there's 
no way to tell the two conditions apart in logs. Could you use a distinct 
message here, e.g. `"Expression index has no index definitions defined"`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/DirectoryInfo.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
+
+/**
+ * A class which represents a directory and the files and directories inside 
it.
+ * <p>
+ * A {@code DirectoryInfo} object saves the name of the partition and various 
properties requires of each file
+ * required for initializing the metadata table. Saving limited properties 
reduces the total memory footprint when
+ * a very large number of files are present in the dataset being initialized.
+ */
+@Getter
+public class DirectoryInfo implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  // Relative path of the directory (relative to the base directory)
+  private final String relativePath;
+  // Map of filenames within this partition to their respective sizes
+  private final Map<String, Long> filenameToSizeMap;
+  // List of directories within this partition
+  private final List<StoragePath> subDirectories = new ArrayList<>();
+  // Is this a hoodie partition
+  private boolean isHoodiePartition = false;
+
+  public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime, Set<String> pendingDataInstants) {
+    this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
+  }
+
+  /**
+   * When files are directly fetched from Metadata table we do not need to 
validate HoodiePartitions.
+   */
+  public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime, Set<String> pendingDataInstants,
+                       boolean validateHoodiePartitions) {
+    this.relativePath = relativePath;
+
+    // Pre-allocate with the maximum length possible
+    filenameToSizeMap = new HashMap<>(pathInfos.size());
+
+    // Presence of partition meta file implies this is a HUDI partition
+    // if input files are directly fetched from MDT, it may not contain the 
HoodiePartitionMetadata file. So, we can ignore the validation for 
isHoodiePartition.
+    isHoodiePartition = !validateHoodiePartitions || 
pathInfos.stream().anyMatch(status -> 
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+    for (StoragePathInfo pathInfo : pathInfos) {
+      // Do not attempt to search for more subdirectories inside directories 
that are partitions
+      if (!isHoodiePartition && pathInfo.isDirectory()) {
+        // Ignore .hoodie directory as there cannot be any partitions inside it
+        if 
(!pathInfo.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+          this.subDirectories.add(pathInfo.getPath());
+        }
+      } else if (isHoodiePartition && FSUtils.isDataFile(pathInfo.getPath())) {
+        // Regular HUDI data file (base file or log file)
+        String dataFileCommitTime = 
FSUtils.getCommitTime(pathInfo.getPath().getName());
+        // Limit the file listings to files which were created by successful 
commits before the maxInstant time.
+        if (!pendingDataInstants.contains(dataFileCommitTime) && 
compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) {
+          filenameToSizeMap.put(pathInfo.getPath().getName(), 
pathInfo.getLength());
+        }
+      }
+    }
+  }
+
+  public static Map<String, List<FileInfo>> 
getPartitionToFileInfo(List<DirectoryInfo> partitionInfoList) {
+    return partitionInfoList.stream()
+        .map(p -> {
+          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());

Review Comment:
   🤖 nit: the lambda variable `p` is easy to mistake for a partition string 
given the surrounding context — could you rename it to `dirInfo` or `dir` to 
make it clear it's a `DirectoryInfo`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.index.model.IndexPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnsToIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private final Lazy<List<String>> columnsToIndex;
+
+  public ColumnStatsIndexer(HoodieEngineContext engineContext,
+                               HoodieWriteConfig dataTableWriteConfig,
+                               HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.columnsToIndex = Lazy.lazily(() ->
+        new ArrayList<>(getColumnsToIndex(
+            dataTableMetaClient.getTableConfig(),
+            dataTableWriteConfig.getMetadataConfig(),
+            Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)),
+            true,
+            Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()),
+            existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient)).keySet()));
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    final int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    // columnsToIndex can be empty if meta fields are disabled and cols to 
index is not explicitly overridden.
+    if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }
+
+    log.info("Indexing {} columns for column stats index", 
columnsToIndex.get().size());
+    // during initialization, we need stats for base and log files.
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+        engineContext, Collections.emptyMap(), partitionToAllFilesMap,
+        dataTableMetaClient, 
dataTableWriteConfig.getColumnStatsIndexParallelism(),
+        dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
+        columnsToIndex.get());
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildUpdate(
+      String instantTime,
+      HoodieBackedTableMetadata tableMetadata,
+      Lazy<HoodieTableFileSystemView> lazyFileSystemView,
+      HoodieCommitMetadata commitMetadata) {
+    final HoodieData<HoodieRecord> records = 
convertMetadataToColumnStatsRecords(commitMetadata, engineContext,
+        dataTableMetaClient, dataTableWriteConfig.getMetadataConfig(), 
Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()));
+    return 
Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(),
 records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildClean(String instantTime, 
HoodieCleanMetadata cleanMetadata) {
+    final HoodieData<HoodieRecord> records =
+        convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, 
dataTableMetaClient,
+            dataTableWriteConfig.getMetadataConfig(), 
Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()));
+    return 
Collections.singletonList(IndexPartitionAndRecords.of(COLUMN_STATS.getPartitionPath(),
 records));
+  }
+
+  @Override
+  public List<IndexPartitionAndRecords> buildRestore(String instantTime, 
List<String> deletedPartitions, Map<String, List<FileInfo>> filesAdded, 
Map<String, List<String>> filesDeleted) {
+    if (filesDeleted.isEmpty() && filesAdded.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Lazy<Option<HoodieSchema>> tableSchema =
+        Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient));
+    final List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(

Review Comment:
   🤖 nit: this local `columnsToIndex` has the same name as the class field 
(line 74) but is computed with a different `populateMetaFields` argument 
(`false` here vs. `true` in the field). Could you rename it to something like 
`restoreColumnsToIndex` and add a short comment explaining why the argument 
differs, to avoid a future reader swapping in `this.columnsToIndex.get()` and 
silently changing behavior?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to