yihua commented on code in PR #18348:
URL: https://github.com/apache/hudi/pull/18348#discussion_r3036289765


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/ExpressionIndexRecordGenerator.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import java.util.List;
+
+/**
+ * Engine-specific generator for expression index records used during metadata 
index bootstrap.
+ */
+public interface ExpressionIndexRecordGenerator {
+  EngineType getEngineType();
+
+  /**
+   * Generates expression index records.

Review Comment:
   πŸ€– nit: the `@param filesToIndex` javadoc still says "triplet of partition, 
(file path, file size)" but the type is now `List<FileInfoAndPartition>` β€” 
worth updating the description to match the new model class.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/record/TestRecordIndexer.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestRecordIndexer {
+
+  @Test
+  void testGetDataCreatesDefinitionAndReturnsInitialization() throws 
IOException {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieData<HoodieRecord> records = mock(HoodieData.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(metadataConfig.getRecordIndexMaxParallelism()).thenReturn(4);
+
+    DataPartitionAndRecords init = new DataPartitionAndRecords(2, 
Option.empty(), records);
+    ExposedRecordIndexer indexer = new ExposedRecordIndexer(engineContext, 
writeConfig, metaClient, init);
+
+    try (MockedStatic<org.apache.hudi.metadata.HoodieTableMetadataUtil> 
mockedUtil = 
mockStatic(org.apache.hudi.metadata.HoodieTableMetadataUtil.class)) {

Review Comment:
   πŸ€– nit: same fully-qualified 
`org.apache.hudi.metadata.HoodieTableMetadataUtil` usage as in 
`TestPartitionedRecordIndexer` β€” an import would clean this up and it repeats 
on lines 63, 111, and 116 too.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+/**
+ * Data class representing a file and partition with a flag.
+ */
+@Getter
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class FileAndPartitionFlag {
+  private final String partitionPath;
+  private final String fileName;
+  private final boolean flag;
+

Review Comment:
   πŸ€– nit: `flag` is pretty opaque β€” it's unclear what this boolean represents 
at the call site. Something like `isDeleted` or `isAdded` (or whatever semantic 
it carries) would read more clearly.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileSliceAndPartition.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import org.apache.hudi.common.model.FileSlice;
+
+import lombok.AllArgsConstructor;
+import lombok.ToString;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Holder for a {@link FileSlice} and its partition path.
+ */
+@AllArgsConstructor
+@Getter
+@Setter

Review Comment:
   πŸ€– nit: `@Setter` makes this mutable, which is inconsistent with `FileInfo` 
and `FileInfoAndPartition` in the same package β€” both use `final` fields and no 
setters. If mutation isn't needed here, have you considered dropping `@Setter` 
and making the fields `final`?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -964,7 +964,7 @@ public void 
testMetadataTableDeletePartition(HoodieTableType tableType) throws E
         assertEquals("0000003", completedReplaceInstant.get().requestedTime());
 
         final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes 
= new HashMap<>();
-        metadataWriter.getEnabledPartitionTypes().forEach(e -> 
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
+        metadataWriter.getEnabledIndexerMap().keySet().forEach(e -> 
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));

Review Comment:
   πŸ€– nit: `e` isn't very expressive here β€” could you rename it to 
`partitionType` to match the domain concept? Same applies at the identical 
forEach on line 4105.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileInfoAndPartition.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+/**
+ * Immutable descriptor for a file that should be scanned during index 
bootstrap.
+ */
+@Getter
+@Accessors(fluent = true)
+public class FileInfoAndPartition {
+  private final String partitionPath;
+  private final String name;
+  private final long size;
+
+  private FileInfoAndPartition(String partitionPath, String name, long size) {
+    this.partitionPath = partitionPath;
+    this.name = name;
+    this.size = size;
+  }
+
+  public static FileInfoAndPartition of(String partition, String path, long 
size) {
+    return new FileInfoAndPartition(partition, path, size);

Review Comment:
   πŸ€– nit: the factory parameter is named `path` but the backing field is `name` 
β€” could you align them so readers don't have to mentally translate at the call 
site?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1308,20 +1270,20 @@ public static String getPartitionIdentifier(@Nonnull 
String relativePartitionPat
    */
   public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
                                                                           
Map<String, List<String>> partitionToDeletedFiles,
-                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
Map<String, List<FileInfo>> partitionToAppendedFiles,
                                                                           
String instantTime,
                                                                           
HoodieTableMetaClient dataMetaClient,
                                                                           int 
bloomIndexParallelism,
                                                                           
String bloomFilterType) {
     // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
-    final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
fetchPartitionFileInfoTriplets(partitionToDeletedFiles, 
partitionToAppendedFiles);
+    final List<FileAndPartitionFlag> partitionFileFlagTupleList = 
fetchPartitionFileInfo(partitionToDeletedFiles, partitionToAppendedFiles);
 
     // Create records MDT
     int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
bloomIndexParallelism), 1);
     return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
-      final String partitionName = partitionFileFlagTuple.f0;
-      final String filename = partitionFileFlagTuple.f1;
-      final boolean isDeleted = partitionFileFlagTuple.f2;
+      final String partitionName = partitionFileFlagTuple.partitionPath();

Review Comment:
   πŸ€– nit: `partitionFileFlagTuple.flag()` doesn't read as a boolean predicate β€” 
could the accessor on `FileAndPartitionFlag` be named `isDeleted()` to make the 
intent obvious at the call site?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * Implementation of the global {@link MetadataPartitionType#RECORD_INDEX} 
index
+ */
+@Slf4j
+public class PartitionedRecordIndexer extends BaseRecordIndexer {
+  public PartitionedRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig,
+                                     HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) 
throws IOException {
+    createRecordIndexDefinition(dataTableMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true"));
+    Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = 
lazyLatestMergedPartitionFileSliceList.get().stream()
+        .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath));
+    Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new 
HashMap<>(partitionFileSlicePairsMap.size());
+    int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty()
+        ? 1 : Math.max(1, 
dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / 
partitionFileSlicePairsMap.size());
+    int totalFileGroupCount = 0;

Review Comment:
   πŸ€– nit: iterating over `keySet()` and then calling 
`partitionFileSlicePairsMap.get(partition)` inside is a classic two-lookup 
pattern. The second loop in this same method already uses `entrySet()` β€” could 
you align the first loop to do the same?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/IndexerFactory.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer;
+import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer;
+import org.apache.hudi.metadata.index.expression.ExpressionIndexer;
+import org.apache.hudi.metadata.index.files.FilesIndexer;
+import org.apache.hudi.metadata.index.partitionstats.PartitionStatsIndexer;
+import org.apache.hudi.metadata.index.record.PartitionedRecordIndexer;
+import org.apache.hudi.metadata.index.record.RecordIndexer;
+import org.apache.hudi.metadata.index.secondary.SecondaryIndexer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for creating {@link Indexer} implementations and resolving enabled 
indexers
+ * based on table and metadata configuration.
+ */

Review Comment:
   πŸ€– nit: `getIndexer` sounds like a retrieval from cache or registry; since 
it's always constructing a new instance, `createIndexer` would better signal 
its intent as a factory method.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**

Review Comment:
   πŸ€– nit: the Javadoc says "Implementation of the **global** `RECORD_INDEX` 
index" β€” but this class is the *partitioned* variant. `RecordIndexer` is the 
global one. Might be worth updating to avoid confusion.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+
+/**
+ * Base implementation for record-index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled
+    if 
(dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled())
 {
+      validateRecordIndex(records, fileGroupCount, metadataMetaClient);
+    }
+    records.unpersist();
+  }
+
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table. The validation is performed in 
a distributed manner
+   * using the engine context to count records from HFiles in parallel.
+   *
+   * @param recordIndexRecords the HoodieData containing the expected records
+   * @param fileGroupCount the expected number of file groups
+   * @param metadataMetaClient meta client for the metadata table
+   */
+  protected void validateRecordIndex(HoodieData<HoodieRecord> 
recordIndexRecords, int fileGroupCount, HoodieTableMetaClient 
metadataMetaClient) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    try {
+      // Use merged file slices to handle cases with pending compactions
+      List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
fsView, partitionName);
+
+      // Filter to only file slices with base files and extract their storage 
paths
+      List<StoragePath> baseFilePaths = fileSlices.stream()
+          .filter(fs -> fs.getBaseFile().isPresent())
+          .map(fs -> fs.getBaseFile().get().getStoragePath())
+          .collect(Collectors.toList());
+
+      // Count records in a distributed manner using the engine context
+      long totalRecords = countRecordsInHFiles(baseFilePaths, 
metadataMetaClient);
+      long expectedRecordCount = recordIndexRecords.count();
+
+      ValidationUtils.checkArgument(totalRecords == expectedRecordCount, 
"Record Count Validation failed with "
+          + totalRecords + " present in record index vs the expected " + 
expectedRecordCount);
+      log.info(String.format("Record index initialized on %d shards (expected 
= %d) with %d records (expected = %d)",
+          fileSlices.size(), fileGroupCount, totalRecords, 
expectedRecordCount));
+    } finally {
+      fsView.close();
+    }
+  }
+
+  /**
+   * Counts the total number of records in HFiles in a distributed manner.
+   *
+   * @param baseFilePaths list of storage paths to HFiles
+   * @param metadataMetaClient meta client for the metadata table
+   * @return total number of records across all HFiles
+   */
+  private long countRecordsInHFiles(List<StoragePath> baseFilePaths, 
HoodieTableMetaClient metadataMetaClient) {
+    if (baseFilePaths.isEmpty()) {
+      return 0L;
+    }
+
+    int parallelism = Math.min(baseFilePaths.size(), 
dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = 
metadataMetaClient.getStorageConf();
+    HoodieFileFormat baseFileFormat = 
metadataMetaClient.getTableConfig().getBaseFileFormat();
+
+    return engineContext.parallelize(baseFilePaths, parallelism)
+        .mapPartitions(pathIterator -> {
+          long count = 0L;
+          while (pathIterator.hasNext()) {
+            StoragePath path = pathIterator.next();
+            try {
+              HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConfBroadcast);
+              HoodieConfig readerConfig = new HoodieConfig();
+              HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage)
+                  .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+                  .getFileReader(readerConfig, path, baseFileFormat, 
Option.empty());
+              try {
+                count += reader.getTotalRecords();
+              } finally {
+                reader.close();
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading total records from 
file " + path, e);

Review Comment:
   πŸ€– nit: small typo in the Javadoc β€” "context ot use" should be "context to 
use".



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/bloomfilters/TestBloomFiltersIndexer.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package org.apache.hudi.metadata.index.bloomfilters;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class TestBloomFiltersIndexer {
+
+  @SuppressWarnings("unchecked")
+  @Test
+  void testInitializeDataWithRealEngineContextAndIndexDataContent() throws 
IOException {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(writeConfig.getBloomIndexParallelism()).thenReturn(8);
+    when(writeConfig.getBloomFilterType()).thenReturn("DYNAMIC_V0");
+    when(metadataConfig.getBloomFilterIndexFileGroupCount()).thenReturn(3);
+
+    HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_bloom",
+            Collections.singletonMap("f1.parquet", 11L), 
Collections.emptyList())),
+        1);
+
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(any(), any(), any(), 
any(), any(), anyInt(), any()))
+          .thenReturn(records);
+
+      ExposedBloomFiltersIndexer indexer = new 
ExposedBloomFiltersIndexer(engineContext, writeConfig, metaClient);
+      List<IndexPartitionInitialization> initializationList = 
indexer.callGetData("001", "002", Collections.emptyMap(), 
Lazy.lazily(Collections::emptyList));
+      assertEquals(1, initializationList.size());
+
+      assertEquals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), 
initializationList.get(0).indexPartitionName());
+      assertEquals(3, initializationList.get(0).totalFileGroups());
+      List<HoodieRecord> collected = 
initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList();
+      assertEquals(1, collected.size());
+      assertEquals("p_bloom", collected.get(0).getRecordKey());
+    }

Review Comment:
   πŸ€– nit: `callGetData` is a bit opaque β€” have you considered naming this 
`callBuildInitialization` or simply `buildInitialization` (overriding 
visibility) to make it obvious what method is being exercised?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.secondary;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
+
+/**
+ * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index
+ */
+@Slf4j
+public class SecondaryIndexer extends BaseIndexer {
+

Review Comment:
   πŸ€– nit: `RECORD_INDEX_AVERAGE_RECORD_SIZE = 48` is also defined in 
`BaseRecordIndexer`. Since `SecondaryIndexer` can't inherit it, could this 
constant live in a shared place (e.g. `HoodieTableMetadataUtil` or a 
`RecordIndexConstants` class) to avoid the duplication?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+
+/**
+ * Base implementation for record-index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled
+    if 
(dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled())
 {
+      validateRecordIndex(records, fileGroupCount, metadataMetaClient);
+    }
+    records.unpersist();
+  }
+
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table. The validation is performed in 
a distributed manner
+   * using the engine context to count records from HFiles in parallel.
+   *
+   * @param recordIndexRecords the HoodieData containing the expected records
+   * @param fileGroupCount the expected number of file groups
+   * @param metadataMetaClient meta client for the metadata table
+   */
+  protected void validateRecordIndex(HoodieData<HoodieRecord> 
recordIndexRecords, int fileGroupCount, HoodieTableMetaClient 
metadataMetaClient) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    try {
+      // Use merged file slices to handle cases with pending compactions
+      List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
fsView, partitionName);
+
+      // Filter to only file slices with base files and extract their storage 
paths
+      List<StoragePath> baseFilePaths = fileSlices.stream()
+          .filter(fs -> fs.getBaseFile().isPresent())
+          .map(fs -> fs.getBaseFile().get().getStoragePath())
+          .collect(Collectors.toList());
+
+      // Count records in a distributed manner using the engine context
+      long totalRecords = countRecordsInHFiles(baseFilePaths, 
metadataMetaClient);
+      long expectedRecordCount = recordIndexRecords.count();
+
+      ValidationUtils.checkArgument(totalRecords == expectedRecordCount, 
"Record Count Validation failed with "
+          + totalRecords + " present in record index vs the expected " + 
expectedRecordCount);
+      log.info(String.format("Record index initialized on %d shards (expected 
= %d) with %d records (expected = %d)",
+          fileSlices.size(), fileGroupCount, totalRecords, 
expectedRecordCount));
+    } finally {
+      fsView.close();
+    }
+  }
+
+  /**
+   * Counts the total number of records in HFiles in a distributed manner.
+   *
+   * @param baseFilePaths list of storage paths to HFiles

Review Comment:
   πŸ€– nit: `storageConfBroadcast` implies a Spark broadcast variable, but this 
is just a plain local captured by the lambda. Something like `storageConf` 
would avoid the misleading connotation.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private Lazy<List<String>> columnsToIndex;
+

Review Comment:
   πŸ€– nit: could `columnsToIndex` be `final`? It's assigned in the constructor 
but then silently reassigned to `emptyList` inside `buildInitialization` when 
the partition map is empty β€” that mutation on `this` in a method is a bit 
surprising. Would it be cleaner to make the field final and handle the 
empty-partition short-circuit without touching it?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) throws IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyLatestMergedPartitionFileSliceList.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (partitionFileSlicePairs.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+
+    int parallelism = Math.min(filesToIndex.size(), 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    Lazy<HoodieSchema> tableSchemaOpt = Lazy.lazily(() ->
+        HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)

Review Comment:
   πŸ€– nit: `tableSchemaOpt` reads like it holds an `Option`, but it's a 
`Lazy<HoodieSchema>` β€” and since `.get()` is called on the very next line, the 
`Lazy` wrapper adds no deferred-evaluation benefit here. Could you just inline 
it as `HoodieSchema tableSchema = 
HoodieTableMetadataUtil.tryResolveSchemaForTable(...).orElseThrow(...)`?



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/columnstats/TestColumnStatsIndexer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class TestColumnStatsIndexer {
+
+  @Test
+  void testInitializeDataWithEmptyInputUsesEmptyHoodieData() throws 
IOException {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieData<HoodieRecord> emptyData = mock(HoodieData.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(metadataConfig.getColumnStatsIndexFileGroupCount()).thenReturn(2);
+    when(engineContext.emptyHoodieData()).thenReturn((HoodieData) emptyData);
+
+    ExposedColumnStatsIndexer indexer = new 
ExposedColumnStatsIndexer(engineContext, writeConfig, metaClient);
+    List<IndexPartitionInitialization> initializationList = 
indexer.callGetData("001", "002", Collections.emptyMap(), 
Lazy.lazily(Collections::emptyList));
+    assertEquals(1, initializationList.size());
+
+    assertEquals(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
initializationList.get(0).indexPartitionName());
+    assertSame(emptyData, 
initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  void testInitializeDataWithRealEngineContextAndIndexDataContent() throws 
IOException {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(writeConfig.getColumnStatsIndexParallelism()).thenReturn(4);
+    when(writeConfig.getRecordMerger()).thenReturn(recordMerger);
+    
when(recordMerger.getRecordType()).thenReturn(HoodieRecord.HoodieRecordType.AVRO);
+    when(metadataConfig.getColumnStatsIndexFileGroupCount()).thenReturn(5);
+    when(metadataConfig.getMaxReaderBufferSize()).thenReturn(4096);
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+
+    Map<String, List<FileInfo>> files = new HashMap<>();
+    files.put("p1", Collections.singletonList(FileInfo.of("f1.parquet", 1L)));
+
+    HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_col",
+            Collections.singletonMap("f_col.parquet", 22L), 
Collections.emptyList())),
+        1);
+
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      Map<String, Object> columns = new HashMap<>();
+      columns.put("c1", new Object());
+      mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(any(), 
any(), any(), eq(true), eq(Option.of(HoodieRecord.HoodieRecordType.AVRO)), 
any()))
+          .thenReturn(columns);
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(any(), any(), any(), 
any(), anyInt(), anyInt(), any()))
+          .thenReturn(records);
+
+      ExposedColumnStatsIndexer indexer = new 
ExposedColumnStatsIndexer(engineContext, writeConfig, metaClient);
+      List<IndexPartitionInitialization> initializationList = 
indexer.callGetData("001", "002", files, Lazy.lazily(Collections::emptyList));
+      assertEquals(1, initializationList.size());
+
+      assertEquals(5, initializationList.get(0).totalFileGroups());
+      List<HoodieRecord> collected = 
initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList();
+      assertEquals(1, collected.size());
+      assertEquals("p_col", collected.get(0).getRecordKey());
+    }
+  }
+
+  private static class ExposedColumnStatsIndexer extends ColumnStatsIndexer {
+    ExposedColumnStatsIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+      super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+    }
+
+    List<IndexPartitionInitialization> callGetData(String 
dataTableInstantTime, String instantTimeForPartition,

Review Comment:
   πŸ€– nit: same as `TestBloomFiltersIndexer` β€” `callGetData` could be renamed to 
`callBuildInitialization` to make it clear what's being tested.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private Lazy<List<String>> columnsToIndex;
+
+  public ColumnStatsIndexer(HoodieEngineContext engineContext,
+                               HoodieWriteConfig dataTableWriteConfig,
+                               HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.columnsToIndex = Lazy.lazily(() ->
+        new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(
+            dataTableMetaClient.getTableConfig(),
+            dataTableWriteConfig.getMetadataConfig(),
+            Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)),
+            true,
+            Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()),
+            existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient)).keySet()));
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) throws IOException {
+    final int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    if (partitionToAllFilesMap.isEmpty()) {
+      this.columnsToIndex = Lazy.lazily(Collections::emptyList);
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }
+
+    if (columnsToIndex.get().isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }
+
+    log.info("Indexing {} columns for column stats index", 
columnsToIndex.get().size());
+    // during initialization, we need stats for base and log files.
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+        engineContext, Collections.emptyMap(), partitionToAllFilesMap,
+        dataTableMetaClient, 
dataTableWriteConfig.getColumnStatsIndexParallelism(),
+        dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
+        columnsToIndex.get());
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), records));
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName(PARTITION_NAME_COLUMN_STATS)
+        .withIndexType(PARTITION_NAME_COLUMN_STATS)
+        .withIndexFunction(PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(columnsToIndex.get())
+        // Use the existing version if exists, otherwise fall back to the 
default version.
+        
.withVersion(existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient))
+        .withIndexOptions(Collections.EMPTY_MAP)
+        .build();

Review Comment:
   πŸ€– nit: `Collections.EMPTY_MAP` is the raw-type constant β€” the rest of the 
metadata module consistently uses `Collections.emptyMap()` for type safety. 
Could you swap it here?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Convert the clean action to metadata records.
+   */
+  public static Map<String, HoodieData<HoodieRecord>> 
convertMetadataToRecords(HoodieEngineContext engineContext,
+                                                                               
HoodieCleanMetadata cleanMetadata,
+                                                                               
String instantTime,
+                                                                               
HoodieTableMetaClient dataMetaClient,

Review Comment:
   πŸ€– nit: the parameter is called `enabledIndexBuilderMap` here but the same 
concept is `enabledIndexerMap` everywhere else in the codebase β€” could you 
align these to reduce the mental mapping?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
+
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+      hasPartitionsStateChanged = true;
     }
 
-    // For a fresh table, defer RLI initialization
-    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
-        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
-      this.enabledPartitionTypes.remove(RECORD_INDEX);
-      partitionsToInit.remove(RECORD_INDEX);
-    }
-
-    Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
-    for (MetadataPartitionType partitionType : partitionsToInit) {
-      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
-      String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
-      String partitionTypeName = partitionType.name();
-      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, instantTimeForPartition);
-      String relativePartitionPath;
-      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
-      Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-      try {
-        switch (partitionType) {
-          case FILES:
-            fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
FILES.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case BLOOM_FILTERS:
-            fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
-            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
-            initializeFilegroupsAndCommit(partitionType, 
COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition, colStatsColumnsAndRecord.getKey());
-            break;
-          case RECORD_INDEX:
-            boolean isPartitionedRLI = 
dataWriteConfig.isRecordLevelIndexEnabled();
-            
initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, 
lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
-            break;
-          case EXPRESSION_INDEX:
-            Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (expressionIndexPartitionsToInit.size() != 1) {
-              if (expressionIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping expression index initialization as only one 
expression index bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          case PARTITION_STATS:
-            // For PARTITION_STATS, COLUMN_STATS should also be enabled
-            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
-              LOG.debug("Skipping partition stats initialization as column 
stats index is not enabled. Please enable {}",
-                  
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-              continue;
-            }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (secondaryIndexPartitionsToInit.size() != 1) {
-              if (secondaryIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
secondaryIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(relativePartitionPath, 
lazyLatestMergedPartitionFileSliceList);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          default:
-            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
-        }
-      } catch (Exception e) {
-        String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
-        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
-            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePath());
-        LOG.error(errMsg, e);
-        throw new HoodieMetadataException(errMsg, e);
-      }
+    for (Map.Entry<MetadataPartitionType, Indexer> entry :
+        indexerMapForPartitionsToInit.entrySet().stream()
+            .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) {
+      initializeMetadataPartition(entry.getKey(), entry.getValue(),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
       hasPartitionsStateChanged = true;
     }
     return true;
   }
 
+  @SneakyThrows
+  private void initializeMetadataPartition(
+      MetadataPartitionType partitionType,
+      Indexer indexer,
+      String dataTableInstantTime,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) {
+    String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
+    // initialize metadata partitions
+    List<IndexPartitionInitialization> initializationList;
+    try {
+      initializationList = indexer.buildInitialization(
+          dataTableInstantTime, instantTimeForPartition, 
partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList);
+    } catch (Exception e) {
+      String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
+      metrics.ifPresent(m -> m.setMetric(metricKey, 1));
+      String errMsg = String.format("Bootstrap on %s partition failed for %s",
+          partitionType.getPartitionPath(), metadataMetaClient.getBasePath());
+      LOG.error(errMsg, e);
+      throw new HoodieMetadataException(errMsg, e);
+    }
+
+    if (initializationList.isEmpty()) {
+      LOG.info("Skip building {} index in metadata table", 
partitionType.getPartitionPath());
+      return;
+    }
+
+    ValidationUtils.checkArgument(initializationList.size() == 1,
+        "Only support the initialization of one partition per index type "
+            + "(HUDI-9358 for the feature support)");
+
+    IndexPartitionInitialization initialIndexPartitionData = 
initializationList.get(0);
+    final int numFileGroup = initialIndexPartitionData.totalFileGroups();
+    String relativePartitionPath = 
initialIndexPartitionData.indexPartitionName();
+    LOG.info("Initializing {} index with {} file groups", 
relativePartitionPath, numFileGroup);
+
+    HoodieTimer partitionInitTimer = HoodieTimer.start();
+    clearExistingMetadataPartition(relativePartitionPath);
+    HoodieData<HoodieRecord> records = engineContext.emptyHoodieData();
+    for (DataPartitionAndRecords dataPartitionAndRecords: 
initialIndexPartitionData.dataPartitionAndRecords()) {
+      initializeFileGroups(dataMetaClient, partitionType, 
instantTimeForPartition,
+          dataPartitionAndRecords.numFileGroups(), relativePartitionPath, 
dataPartitionAndRecords.dataPartition());
+      records = records.union(dataPartitionAndRecords.indexRecords());
+    }
+
+    bulkCommit(instantTimeForPartition, relativePartitionPath, records, 
initialIndexPartitionData.indexParser());
+
+    indexer.postInitialization(metadataMetaClient, records, numFileGroup, 
relativePartitionPath);
+    // initialize metadata reader
+    initMetadataReader();
+    long totalInitTime = partitionInitTimer.endTimer();
+    LOG.info("Initializing {} index in metadata table took {} in ms", 
partitionType, totalInitTime);

Review Comment:
   πŸ€– nit: `@SneakyThrows` silently swallows the checked `IOException` contract 
from `clearExistingMetadataPartition`, making it invisible to callers. Could 
you declare `throws IOException` on this method instead and let the exception 
propagate explicitly?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
+
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+      hasPartitionsStateChanged = true;
     }
 
-    // For a fresh table, defer RLI initialization
-    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
-        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
-      this.enabledPartitionTypes.remove(RECORD_INDEX);
-      partitionsToInit.remove(RECORD_INDEX);
-    }
-
-    Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
-    for (MetadataPartitionType partitionType : partitionsToInit) {
-      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
-      String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
-      String partitionTypeName = partitionType.name();
-      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, instantTimeForPartition);
-      String relativePartitionPath;
-      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
-      Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-      try {
-        switch (partitionType) {
-          case FILES:
-            fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
FILES.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case BLOOM_FILTERS:
-            fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
-            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
-            initializeFilegroupsAndCommit(partitionType, 
COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition, colStatsColumnsAndRecord.getKey());
-            break;
-          case RECORD_INDEX:
-            boolean isPartitionedRLI = 
dataWriteConfig.isRecordLevelIndexEnabled();
-            
initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, 
lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
-            break;
-          case EXPRESSION_INDEX:
-            Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (expressionIndexPartitionsToInit.size() != 1) {
-              if (expressionIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping expression index initialization as only one 
expression index bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          case PARTITION_STATS:
-            // For PARTITION_STATS, COLUMN_STATS should also be enabled
-            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
-              LOG.debug("Skipping partition stats initialization as column 
stats index is not enabled. Please enable {}",
-                  
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-              continue;
-            }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (secondaryIndexPartitionsToInit.size() != 1) {
-              if (secondaryIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
secondaryIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(relativePartitionPath, 
lazyLatestMergedPartitionFileSliceList);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          default:
-            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
-        }
-      } catch (Exception e) {
-        String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
-        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
-            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePath());
-        LOG.error(errMsg, e);
-        throw new HoodieMetadataException(errMsg, e);
-      }
+    for (Map.Entry<MetadataPartitionType, Indexer> entry :
+        indexerMapForPartitionsToInit.entrySet().stream()
+            .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) {
+      initializeMetadataPartition(entry.getKey(), entry.getValue(),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
       hasPartitionsStateChanged = true;
     }
     return true;
   }
 
+  @SneakyThrows
+  private void initializeMetadataPartition(
+      MetadataPartitionType partitionType,
+      Indexer indexer,
+      String dataTableInstantTime,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) {
+    String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
+    // initialize metadata partitions
+    List<IndexPartitionInitialization> initializationList;
+    try {
+      initializationList = indexer.buildInitialization(
+          dataTableInstantTime, instantTimeForPartition, 
partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList);
+    } catch (Exception e) {
+      String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
+      metrics.ifPresent(m -> m.setMetric(metricKey, 1));
+      String errMsg = String.format("Bootstrap on %s partition failed for %s",
+          partitionType.getPartitionPath(), metadataMetaClient.getBasePath());
+      LOG.error(errMsg, e);
+      throw new HoodieMetadataException(errMsg, e);
+    }
+
+    if (initializationList.isEmpty()) {
+      LOG.info("Skip building {} index in metadata table", 
partitionType.getPartitionPath());
+      return;
+    }
+
+    ValidationUtils.checkArgument(initializationList.size() == 1,
+        "Only support the initialization of one partition per index type "
+            + "(HUDI-9358 for the feature support)");
+
+    IndexPartitionInitialization initialIndexPartitionData = 
initializationList.get(0);
+    final int numFileGroup = initialIndexPartitionData.totalFileGroups();
+    String relativePartitionPath = 
initialIndexPartitionData.indexPartitionName();
+    LOG.info("Initializing {} index with {} file groups", 
relativePartitionPath, numFileGroup);
+
+    HoodieTimer partitionInitTimer = HoodieTimer.start();
+    clearExistingMetadataPartition(relativePartitionPath);
+    HoodieData<HoodieRecord> records = engineContext.emptyHoodieData();
+    for (DataPartitionAndRecords dataPartitionAndRecords: 
initialIndexPartitionData.dataPartitionAndRecords()) {

Review Comment:
   πŸ€– nit: collecting the entry set to a list just to iterate it feels a bit 
roundabout. Have you considered using `.entrySet().stream().filter(e -> 
e.getKey() != FILES).forEach(e -> initializeMetadataPartition(...))` to avoid 
the intermediate allocation?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
+
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+      hasPartitionsStateChanged = true;
     }
 
-    // For a fresh table, defer RLI initialization
-    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
-        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
-      this.enabledPartitionTypes.remove(RECORD_INDEX);
-      partitionsToInit.remove(RECORD_INDEX);
-    }
-
-    Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
-    for (MetadataPartitionType partitionType : partitionsToInit) {
-      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
-      String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
-      String partitionTypeName = partitionType.name();
-      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, instantTimeForPartition);
-      String relativePartitionPath;
-      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
-      Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-      try {
-        switch (partitionType) {
-          case FILES:
-            fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
FILES.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case BLOOM_FILTERS:
-            fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
-            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
-            initializeFilegroupsAndCommit(partitionType, 
COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition, colStatsColumnsAndRecord.getKey());
-            break;
-          case RECORD_INDEX:
-            boolean isPartitionedRLI = 
dataWriteConfig.isRecordLevelIndexEnabled();
-            
initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, 
lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
-            break;
-          case EXPRESSION_INDEX:
-            Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (expressionIndexPartitionsToInit.size() != 1) {
-              if (expressionIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping expression index initialization as only one 
expression index bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          case PARTITION_STATS:
-            // For PARTITION_STATS, COLUMN_STATS should also be enabled
-            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
-              LOG.debug("Skipping partition stats initialization as column 
stats index is not enabled. Please enable {}",
-                  
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-              continue;
-            }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (secondaryIndexPartitionsToInit.size() != 1) {
-              if (secondaryIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
secondaryIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(relativePartitionPath, 
lazyLatestMergedPartitionFileSliceList);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          default:
-            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
-        }
-      } catch (Exception e) {
-        String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
-        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
-            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePath());
-        LOG.error(errMsg, e);
-        throw new HoodieMetadataException(errMsg, e);
-      }
+    for (Map.Entry<MetadataPartitionType, Indexer> entry :
+        indexerMapForPartitionsToInit.entrySet().stream()
+            .filter(e -> e.getKey() != FILES).collect(Collectors.toList())) {
+      initializeMetadataPartition(entry.getKey(), entry.getValue(),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
       hasPartitionsStateChanged = true;
     }
     return true;
   }
 
+  @SneakyThrows
+  private void initializeMetadataPartition(
+      MetadataPartitionType partitionType,
+      Indexer indexer,
+      String dataTableInstantTime,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) {
+    String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
+    // initialize metadata partitions
+    List<IndexPartitionInitialization> initializationList;
+    try {
+      initializationList = indexer.buildInitialization(
+          dataTableInstantTime, instantTimeForPartition, 
partitionToAllFilesMap, lazyLatestMergedPartitionFileSliceList);
+    } catch (Exception e) {
+      String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
+      metrics.ifPresent(m -> m.setMetric(metricKey, 1));
+      String errMsg = String.format("Bootstrap on %s partition failed for %s",
+          partitionType.getPartitionPath(), metadataMetaClient.getBasePath());
+      LOG.error(errMsg, e);
+      throw new HoodieMetadataException(errMsg, e);
+    }
+
+    if (initializationList.isEmpty()) {
+      LOG.info("Skip building {} index in metadata table", 
partitionType.getPartitionPath());
+      return;

Review Comment:
   πŸ€– nit: `initialIndexPartitionData` reads as "the first index partition data" 
rather than "the initialization data for an index partition". Something like 
`partitionInit` or `indexPartitionInit` might be less ambiguous.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -319,12 +321,11 @@ public static ExpressionIndexComputationMetadata 
getExprIndexRecords(
   }
 
   private static Iterator<Row> 
getExpressionIndexRecordsIterator(HoodieReaderContext<InternalRow> 
readerContext, HoodieTableMetaClient metaClient,
-                                                                 HoodieSchema 
tableSchema, HoodieSchema readerSchema, HoodieWriteConfig dataWriteConfig, 
Pair<String, Pair<String, Long>> entry) {
-    String partition = entry.getKey();
-    Pair<String, Long> filePathSizePair = entry.getValue();
-    String filePath = filePathSizePair.getKey();
+                                                                 HoodieSchema 
tableSchema, HoodieSchema readerSchema, HoodieWriteConfig dataWriteConfig, 
FileInfoAndPartition entry) {
+    String partition = entry.partitionPath();

Review Comment:
   πŸ€– nit: `entry.name()` is assigned to a variable called `filePath`, 
suggesting it returns a full path rather than just a file name β€” could you 
rename the accessor to `filePath()` or `path()` on `FileInfoAndPartition` to 
match what it actually holds?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/Indexer.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.util.Lazy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface to building records for initializing and updating a type of 
metadata or index
+ * in the metadata table.
+ * <p>
+ * When a new type of index is added to MetadataPartitionType, an
+ * implementation of the {@link Indexer} interface is required, and it
+ * must be added to {@link IndexerFactory}.
+ */
+public interface Indexer {
+  /**
+   * Generates records for initializing the index.
+   *
+   * @param dataTableInstantTime                   instant time of the data 
table that the metadata table is initialized on
+   * @param instantTimeForPartition                instant time used for 
initializing a specific metadata partition
+   * @param partitionToAllFilesMap                 map of partition to files
+   * @param lazyLatestMergedPartitionFileSliceList lazily-evaluated list of 
file slices for the indexer that needs it
+   * @return zero or more {@link IndexPartitionInitialization} entries to be 
initialized.
+   * Returning an empty list means no metadata partition needs initialization 
in this invocation.
+   * @throws IOException upon IO error
+   */
+  List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) throws IOException;

Review Comment:
   πŸ€– nit: `lazyLatestMergedPartitionFileSliceList` is quite a mouthful β€” 
something like `lazyPartitionFileSlices` carries the same meaning with less 
visual noise.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/expression/TestExpressionIndexer.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class TestExpressionIndexer {
+
+  @Test
+  void testSkipWhenMultipleExpressionPartitions() throws IOException {
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit(any(), any(), any()))
+          .thenReturn(Set.of("expr1", "expr2"));
+
+      ExposedExpressionIndexer indexer = new 
ExposedExpressionIndexer(engineContext, writeConfig, metaClient, 
mock(ExpressionIndexRecordGenerator.class));
+      List<IndexPartitionInitialization> result = indexer.callGetData("001", 
"002", Collections.emptyMap(), Lazy.lazily(Collections::emptyList));
+      assertTrue(result.isEmpty());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  void testInitializeWithRealEngineContextAndIndexDataContent() throws 
IOException {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    ExpressionIndexRecordGenerator generator = 
mock(ExpressionIndexRecordGenerator.class);
+    HoodieIndexDefinition definition = mock(HoodieIndexDefinition.class);
+    HoodieSchema tableSchema = mock(HoodieSchema.class);
+    HoodieSchema projectedSchema = mock(HoodieSchema.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(metadataConfig.getExpressionIndexFileGroupCount()).thenReturn(6);
+    when(metadataConfig.getExpressionIndexParallelism()).thenReturn(10);
+
+    HoodieData<HoodieRecord> records = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p_expr",
+            Collections.singletonMap("f_expr.parquet", 55L), 
Collections.emptyList())),
+        1);
+    when(generator.generate(any(), any(), any(), anyInt(), any(), any(), 
any(), any())).thenReturn(records);
+
+    FileSlice fileSlice = new FileSlice("p1", "001", "f1");
+    fileSlice.setBaseFile(new HoodieBaseFile("file:///tmp/p1/f1.parquet"));
+    List<FileSliceAndPartition> fileSlices = 
Collections.singletonList(FileSliceAndPartition.of("p1", fileSlice));
+
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit(any(), any(), any()))
+          .thenReturn(Collections.singleton("expr_idx"));
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.getHoodieIndexDefinition("expr_idx", 
metaClient)).thenReturn(definition);
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(metaClient)).thenReturn(Option.of(tableSchema));
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex(definition, 
metaClient, tableSchema)).thenReturn(projectedSchema);
+
+      ExposedExpressionIndexer indexer = new 
ExposedExpressionIndexer(engineContext, writeConfig, metaClient, generator);
+      List<IndexPartitionInitialization> initializationList = 
indexer.callGetData("001", "002", new HashMap<>(), Lazy.lazily(() -> 
fileSlices));
+      assertEquals(1, initializationList.size());
+
+      assertEquals("expr_idx", initializationList.get(0).indexPartitionName());
+      List<HoodieRecord> collected = 
initializationList.get(0).dataPartitionAndRecords().get(0).indexRecords().collectAsList();
+      assertEquals(1, collected.size());
+      assertEquals("p_expr", collected.get(0).getRecordKey());
+    }
+  }
+
+  private static class ExposedExpressionIndexer extends ExpressionIndexer {
+    ExposedExpressionIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig,
+                             HoodieTableMetaClient dataTableMetaClient, 
ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+      super(engineContext, dataTableWriteConfig, dataTableMetaClient, 
expressionIndexRecordGenerator);
+    }
+
+    List<IndexPartitionInitialization> callGetData(String 
dataTableInstantTime, String instantTimeForPartition,

Review Comment:
   πŸ€– nit: `callGetData` doesn't quite describe what's happening β€” it actually 
delegates to `buildInitialization`. Something like `callBuildInitialization` 
(or just calling `buildInitialization` directly, since it's protected) would 
make the intent clearer. Same pattern appears in `TestFilesIndexer`, 
`TestPartitionStatsIndexer`, `TestRecordIndexer`, and `TestSecondaryIndexer`.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexTableVersionSix.scala:
##########
@@ -18,15 +18,49 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.MetadataPartitionType
 
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Tag
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.Collections
 
 @Tag("functional-b")
 class TestGlobalRecordLevelIndexTableVersionSix extends 
TestGlobalRecordLevelIndex {
   override def commonOpts: Map[String, String] = super.commonOpts ++ Map(
     HoodieTableConfig.VERSION.key() -> "6",
     HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6"
   )
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  override def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name(),
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+
+    val writeConfig = getWriteConfig(hudiOpts)
+    writeConfig.setSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+    
getHoodieWriteClient(writeConfig).dropIndex(Collections.singletonList(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+    assertEquals(0, getFileGroupCountForRecordIndex(writeConfig))
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(0, 
metaClient.getTableConfig.getMetadataPartitionsInflight.size())

Review Comment:
   πŸ€– nit: "partition stats partition" reads oddly β€” looks like a duplicate 
word. And since the assertion below checks for size 2, the comment might mean 
"only files and col stats partitions should be present"?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -453,8 +454,25 @@ public static Set<String> getAllPartitionPaths() {
    */
   public static MetadataPartitionType[] getValidValues() {

Review Comment:
   πŸ€– nit: the `// ALL_PARTITIONS is just another record type in FILES 
partition` comment now lives in the no-arg stub rather than in the overload 
that actually contains the filtering logic β€” might be worth moving it down to 
`getValidValues(HoodieTableVersion)` where it still describes something 
meaningful.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -4089,20 +4089,20 @@ private void validateMetadata(SparkRDDWriteClient 
testClient, Option<String> ign
     List<String> metadataTablePartitions = 
FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient, false);
     // Secondary index is enabled by default but no MDT partition 
corresponding to it is available
     final boolean isPartitionStatsEnabled;
-    if (!metadataWriter.getEnabledPartitionTypes().contains(COLUMN_STATS)) {
+    if (!metadataWriter.getEnabledIndexerMap().containsKey(COLUMN_STATS)) {

Review Comment:
   πŸ€– nit: the if/else block could collapse into a single line β€” `final boolean 
isPartitionStatsEnabled = 
metadataWriter.getEnabledIndexerMap().containsKey(COLUMN_STATS);`



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/index/record/TestPartitionedRecordIndexer.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.util.Lazy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class TestPartitionedRecordIndexer {
+
+  @SuppressWarnings("unchecked")
+  @Test
+  void testInitializeWithRealEngineContextAndIndexDataContent() throws 
IOException {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getDefaultStorageConf());
+    HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+    HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+    HoodieTableMetaClient dataMetaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+    when(metadataConfig.getRecordIndexMaxParallelism()).thenReturn(8);
+    when(dataMetaClient.getTableConfig()).thenReturn(tableConfig);
+
+    HoodieData<HoodieRecord> p1Data = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p1_data",
+            Collections.singletonMap("f1.parquet", 1L), 
Collections.emptyList())),
+        1);
+    HoodieData<HoodieRecord> p2Data = (HoodieData<HoodieRecord>) 
(HoodieData<?>) engineContext.parallelize(
+        
Collections.singletonList(HoodieMetadataPayload.createPartitionFilesRecord("p2_data",
+            Collections.singletonMap("f2.parquet", 2L), 
Collections.emptyList())),
+        1);
+
+    DataPartitionAndRecords p1Init = new DataPartitionAndRecords(1, 
Option.of("p1"), p1Data);
+    DataPartitionAndRecords p2Init = new DataPartitionAndRecords(2, 
Option.of("p2"), p2Data);
+
+    FileSliceAndPartition fs1 = FileSliceAndPartition.of("p1", new 
FileSlice("p1", "001", "f1"));
+    FileSliceAndPartition fs2 = FileSliceAndPartition.of("p2", new 
FileSlice("p2", "001", "f2"));
+    List<FileSliceAndPartition> input = Arrays.asList(fs1, fs2);
+
+    ExposedPartitionedRecordIndexer indexer = new 
ExposedPartitionedRecordIndexer(engineContext, writeConfig, dataMetaClient, 
p1Init, p2Init);
+
+    try (MockedStatic<org.apache.hudi.metadata.HoodieTableMetadataUtil> 
mockedUtil = 
mockStatic(org.apache.hudi.metadata.HoodieTableMetadataUtil.class)) {

Review Comment:
   πŸ€– nit: `org.apache.hudi.metadata.HoodieTableMetadataUtil` is repeated as a 
fully-qualified name in both the `try` header and the `verify` call (line 76) β€” 
could you add an import and use the simple name instead? The other test files 
already import it this way.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/index/SparkExpressionIndexRecordGenerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Spark implementation of {@link ExpressionIndexRecordGenerator}.

Review Comment:
   πŸ€– nit: `@Slf4j` is declared but `log` doesn't appear to be used anywhere in 
this class β€” worth removing it to keep the class tidy, or adding a log 
statement if one was intended.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/DirectoryInfo.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
+
+/**
+ * A class which represents a directory and the files and directories inside 
it.
+ * <p>
+ * A {@code PartitionFileInfo} object saves the name of the partition and 
various properties requires of each file
+ * required for initializing the metadata table. Saving limited properties 
reduces the total memory footprint when

Review Comment:
   πŸ€– nit: the Javadoc references `PartitionFileInfo` but the class is 
`DirectoryInfo` β€” looks like a stale copy-paste. Could you update it to match?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to