suryaprasanna commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1225448478


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -106,10 +107,10 @@ public Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<St
     //       materialized state, to make sure there's no concurrent access
     synchronized (this) {
       logRecordScanner.scanByFullKeys(keys);
-      return logRecordScanner.getRecords().entrySet()
-          .stream()
+      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();

Review Comment:
   What is the fix here? Previously we are iterating on entrySet any reason why 
we changed the iteration to keySet?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -99,7 +90,25 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
    * Deletes the given metadata partitions. This path reuses DELETE_PARTITION 
operation.
    *
    * @param instantTime - instant time when replacecommit corresponding to the 
drop will be recorded in the metadata timeline
-   * @param partitions - list of {@link MetadataPartitionType} to drop
+   * @param partitions  - list of {@link MetadataPartitionType} to drop
    */
   void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions);
+
+  /**
+   * It returns write client for metadata table.
+   */
+  BaseHoodieWriteClient getWriteClient();

Review Comment:
   This comment is addressed as part of 
[PR-8684](https://github.com/apache/hudi/pull/8684)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -99,7 +90,25 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
    * Deletes the given metadata partitions. This path reuses DELETE_PARTITION 
operation.
    *
    * @param instantTime - instant time when replacecommit corresponding to the 
drop will be recorded in the metadata timeline
-   * @param partitions - list of {@link MetadataPartitionType} to drop
+   * @param partitions  - list of {@link MetadataPartitionType} to drop
    */
   void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions);
+
+  /**
+   * It returns write client for metadata table.
+   */
+  BaseHoodieWriteClient getWriteClient();
+
+  /**
+   * Returns true if the metadata table is initialized.
+   */
+  boolean isInitialized();

Review Comment:
   This comment is addressed as part of 
[PR-8684](https://github.com/apache/hudi/pull/8684)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -147,84 +156,55 @@ protected <T extends SpecificRecordBase> 
HoodieBackedTableMetadataWriter(Configu
     this.metrics = Option.empty();
     this.enabledPartitionTypes = new ArrayList<>();
 
-    if (writeConfig.isMetadataTableEnabled()) {
-      this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
-      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy);
-      enabled = true;
-
-      // Inline compaction and auto clean is required as we do not expose this 
table outside
-      ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
-          "Cleaning is controlled internally for Metadata table.");
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
-          "Compaction is controlled internally for metadata table.");
-      // Auto commit is required
-      
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
-          "Auto commit is required for Metadata Table");
-      
ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
-          "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
-      // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
-          "File listing cannot be used for Metadata Table");
-
-      this.dataMetaClient =
-          
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
-      enablePartitions();
-      initRegistry();
-      initialize(engineContext, actionMetadata, inflightInstantTimestamp);
-      initTableMetadata();
-    } else {
-      enabled = false;
+    this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
+
+    if (dataMetaClient.getTableConfig().isMetadataTableEnabled() || 
writeConfig.isMetadataTableEnabled()) {

Review Comment:
   Once metadata table is enabled, the configs are stored in the 
hoodie.properties. So, it becomes a table properties, that means even if 
writers have metadata enable flag that would still go through the metadata 
table. Once initialized this is promoted to table property so that readers, who 
does not have write configs can also use this feature.
   To disable a metadata, first we should delete the metadata table by using 
the metadata delete command in hudi-cli and then set the metadata enable flag 
as false in writer configs.
   This was done defensively so that accidentally passing a wrong configs could 
delete the record_index and other metadata partitions of the table.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -726,25 +832,18 @@ private interface ConvertMetadataFunction {
   /**
    * Processes commit metadata from data table and commits to metadata table.
    *
-   * @param instantTime instant time of interest.
+   * @param instantTime             instant time of interest.
    * @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param <T> type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
-  private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (!dataWriteConfig.isMetadataTableEnabled()) {
-      return;
-    }
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {

Review Comment:
   This is no longer needed, since we moved performing table services during 
construction of the metadata writer.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)

Review Comment:
   Agree, record index currently is supported for COW table types, we can 
extend it to MOR table type in another patch.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -370,11 +336,10 @@ private <T extends SpecificRecordBase> boolean 
isBootstrapNeeded(Option<HoodieIn
     }
 
     // Detect the commit gaps if any from the data and the metadata active 
timeline
-    if 
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
-        latestMetadataInstant.get().getTimestamp())
+    if 
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstantTimestamp)

Review Comment:
   The code path will only be executed if reinitialization is needed.
   The scenario that activates the first if condition in this method is when 
metadata table is created but someone accidentally deleted all the instants in 
the metadata table timeline. So, in those cases metadata table needs to be 
reinitialized.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

Review Comment:
   We should also add a test case on this.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
 
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
+        dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    return Pair.of(fileGroupCount, records);
+  }
+
+  /**
+   * Read the record keys from base files in partitions and return records.
+   */
+  private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+      List<Pair<String, String>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
+    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+      final String partition = p.getKey();
+      final String filename = p.getValue();
+      Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
+
+      final String fileId = FSUtils.getFileId(filename);
+      final String instantTime = FSUtils.getCommitTime(filename);
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);

Review Comment:
   Yes, wrapped the statement under try block.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -170,6 +171,17 @@ public static HoodieWriteConfig createMetadataWriteConfig(
           throw new HoodieMetadataException("Unsupported Metrics Reporter type 
" + writeConfig.getMetricsReporterType());
       }
     }
-    return builder.build();
+
+    HoodieWriteConfig metadataWriteConfig = builder.build();
+    // Inline compaction and auto clean is required as we do not expose this 
table outside
+    ValidationUtils.checkArgument(!metadataWriteConfig.isAutoClean(), 
"Cleaning is controlled internally for Metadata table.");

Review Comment:
   This comment is addressed as part of 
[PR-8684](https://github.com/apache/hudi/pull/8684) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -18,14 +18,19 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;

Review Comment:
   Already addressed.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+      // Initialize partitions for the first time using data from the files on 
the file system
+      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+        LOG.error("Failed to initialize MDT from filesystem");
+        return false;
       }
 
-      String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-      initTableMetadata(); // re-init certain flags in BaseTableMetadata
-      initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
partitionsToInit);
-      initialCommit(createInstantTime, partitionsToInit);
-      updateInitializedPartitionsInTableConfig(partitionsToInit);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
+      return false;
     }
   }
 
   private <T extends SpecificRecordBase> boolean 
metadataTableExists(HoodieTableMetaClient dataMetaClient,
                                                                      Option<T> 
actionMetadata) throws IOException {
-    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(),
-        HoodieTableMetaClient.METAFOLDER_NAME));
+    boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
     boolean reInitialize = false;
 
     // If the un-synced instants have been archived, then
     // the metadata table will need to be initialized again.
     if (exists) {
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
-          .setBasePath(metadataWriteConfig.getBasePath()).build();
-
-      if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
-        LOG.info("Re-initiating metadata table properties since populate meta 
fields have changed");
-        metadataMetaClient = 
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+      try {
+        metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+        if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
+          LOG.info("Re-initiating metadata table properties since populate 
meta fields have changed");
+          metadataMetaClient = initializeMetaClient();
+        }
+      } catch (TableNotFoundException e) {
+        // Table not found, initialize the metadata table.
+        metadataMetaClient = initializeMetaClient();

Review Comment:
   Since, we are in this method, that means data table's hoodie properties says 
metadata table is enabled. So, the first case of metaclient object creation 
using builder is moving out, whereas the method to initialize the metaclient is 
guarded in a try block. So, in the catch block retrying one more time just to 
make sure if it is still the case. If this fails again then automatically the 
error will be thrown.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+    // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
+    // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
+    // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
+    // any instants before that is already synced with metadata table.
+    // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
+    // instant before c4 is synced with metadata table.
+    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
 
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+    if (!pendingInstants.isEmpty()) {
+      checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+      LOG.info(String.format(
+          "Cannot compact metadata table as there are %d inflight instants in 
data table before latest deltacommit in metadata table: %s. Inflight instants 
in data table: %s",
+          pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
+    return true;
   }
 
-  private HoodieData<HoodieRecord> getFilesPartitionRecords(String 
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord 
allPartitionRecord) {
-    HoodieData<HoodieRecord> filesPartitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
-    if (partitionInfoList.isEmpty()) {
-      return filesPartitionRecords;
-    }
+  /**
+   * Return records that represent update to the record index due to write 
operation on the dataset.
+   *
+   * @param writeStatuses (@code WriteStatus} from the write operation
+   */
+  private HoodieData<HoodieRecord> 
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
+    return writeStatuses.flatMap(writeStatus -> {
+      List<HoodieRecord> recordList = new LinkedList<>();
+      for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
+        if (!writeStatus.isErrored(writtenRecord.getKey())) {
+          HoodieRecord hoodieRecord;
+          HoodieKey key = writtenRecord.getKey();
+          Option<HoodieRecordLocation> newLocation = 
writtenRecord.getNewLocation();
+          if (newLocation.isPresent()) {
+            if (writtenRecord.getCurrentLocation() != null) {
+              // This is an update, no need to update index if the location 
has not changed
+              // newLocation should have the same fileID as currentLocation. 
The instantTimes differ as newLocation's
+              // instantTime refers to the current commit which was completed.
+              if 
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
 {
+                final String msg = String.format("Detected update in location 
of record with key %s from %s "
+                        + " to %s. The fileID should not change.",
+                    writtenRecord.getKey(), 
writtenRecord.getCurrentLocation(), newLocation.get());
+                LOG.error(msg);
+                throw new HoodieMetadataException(msg);
+              } else {
+                // TODO: This may be required for clustering usecases where 
record location changes

Review Comment:
   Currently updates to record index are not allowed. Cases where records are 
moving from one file group to another file group will be effected. We will need 
a followup to address this case.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();

Review Comment:
   Yes, moving this inside initializeFromFilesystem method before switch 
statement. That way readers are loaded when creating other partitions on this 
first iteration inside the for loop.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;

Review Comment:
   The return type of the method is used to track if the metadata table is 
fully initialized or not. So, if partitionsToInit list is empty that means all 
the partitions are already initialized. So, it should return true.
   Adding this as a comment.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -111,18 +111,27 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
   public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
 
+  // Virtual keys support for metadata table. This Field is
+  // from the metadata payload schema.
+  private static final String RECORD_KEY_FIELD_NAME = 
HoodieMetadataPayload.KEY_FIELD_NAME;
+
+  // Average size of a record saved within the record index.
+  // Record index has a fixed size schema. This has been calculated based on 
experiments with default settings
+  // for block size (4MB), compression (GZ) and disabling the hudi metadata 
fields.

Review Comment:
   The type and size of payload is different for each partition. So, I think we 
should use this value for RLI alone.
   Do we know how we came up with the 1MB value for block size?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {

Review Comment:
   Comments are addressed as part of 
[PR-8684](https://github.com/apache/hudi/pull/8684) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -147,84 +156,55 @@ protected <T extends SpecificRecordBase> 
HoodieBackedTableMetadataWriter(Configu
     this.metrics = Option.empty();
     this.enabledPartitionTypes = new ArrayList<>();
 
-    if (writeConfig.isMetadataTableEnabled()) {
-      this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
-      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy);
-      enabled = true;
-
-      // Inline compaction and auto clean is required as we do not expose this 
table outside
-      ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
-          "Cleaning is controlled internally for Metadata table.");
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
-          "Compaction is controlled internally for metadata table.");
-      // Auto commit is required
-      
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
-          "Auto commit is required for Metadata Table");
-      
ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
-          "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
-      // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
-          "File listing cannot be used for Metadata Table");
-
-      this.dataMetaClient =
-          
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
-      enablePartitions();
-      initRegistry();
-      initialize(engineContext, actionMetadata, inflightInstantTimestamp);
-      initTableMetadata();
-    } else {
-      enabled = false;
+    this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
+
+    if (dataMetaClient.getTableConfig().isMetadataTableEnabled() || 
writeConfig.isMetadataTableEnabled()) {
+      this.metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy);
+
+      try {
+        enablePartitions();
+        initRegistry();
+
+        initialized = initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
+
+      } catch (IOException e) {
+        LOG.error("Failed to initialize MDT", e);

Review Comment:
   Fixed it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
 
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
+        dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    return Pair.of(fileGroupCount, records);
+  }
+
+  /**
+   * Read the record keys from base files in partitions and return records.
+   */
+  private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+      List<Pair<String, String>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
+    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+      final String partition = p.getKey();
+      final String filename = p.getValue();
+      Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
+
+      final String fileId = FSUtils.getFileId(filename);
+      final String instantTime = FSUtils.getCommitTime(filename);
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);
+      Iterator<String> recordKeyIterator = reader.getRecordKeyIterator();
+
+      return new Iterator<HoodieRecord>() {
+        @Override
+        public boolean hasNext() {
+          return recordKeyIterator.hasNext();
+        }
+
+        @Override
+        public HoodieRecord next() {
+          return 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId,
+              instantTime);
+        }
+      };
+    });
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(String createInstantTime, List<DirectoryInfo> 
partitionInfoList) {

Review Comment:
   Good catch. Removed it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        initMetadataReader();
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

Review Comment:
   Solo commit is used when there are no commits in the main table timeline. 
So, there will not be any records to write to record index. As part of 
initialization we shall create a empty delete block and commit.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
 
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
+        dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    return Pair.of(fileGroupCount, records);
+  }
+
+  /**
+   * Read the record keys from base files in partitions and return records.
+   */
+  private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+      List<Pair<String, String>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
+    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+      final String partition = p.getKey();
+      final String filename = p.getValue();
+      Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
+
+      final String fileId = FSUtils.getFileId(filename);
+      final String instantTime = FSUtils.getCommitTime(filename);
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);
+      Iterator<String> recordKeyIterator = reader.getRecordKeyIterator();
+
+      return new Iterator<HoodieRecord>() {
+        @Override
+        public boolean hasNext() {
+          return recordKeyIterator.hasNext();
+        }
+
+        @Override
+        public HoodieRecord next() {
+          return 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId,
+              instantTime);
+        }
+      };
+    });
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(String createInstantTime, List<DirectoryInfo> 
partitionInfoList) {
+    // FILES partition uses a single file group
+    final int fileGroupCount = 1;
+
+    List<String> partitions = partitionInfoList.stream().map(p -> 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+        .collect(Collectors.toList());
+    final int totalDataFilesCount = 
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
+    LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata"); //pwason reword

Review Comment:
   Done.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
 
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
+        dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    return Pair.of(fileGroupCount, records);
+  }
+
+  /**
+   * Read the record keys from base files in partitions and return records.
+   */
+  private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+      List<Pair<String, String>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
+    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+      final String partition = p.getKey();
+      final String filename = p.getValue();
+      Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
+
+      final String fileId = FSUtils.getFileId(filename);
+      final String instantTime = FSUtils.getCommitTime(filename);
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);
+      Iterator<String> recordKeyIterator = reader.getRecordKeyIterator();
+
+      return new Iterator<HoodieRecord>() {
+        @Override
+        public boolean hasNext() {
+          return recordKeyIterator.hasNext();
+        }
+
+        @Override
+        public HoodieRecord next() {
+          return 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId,

Review Comment:
   Oh nice, did not see this classes before. Will update the code accordingly.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -370,11 +336,10 @@ private <T extends SpecificRecordBase> boolean 
isBootstrapNeeded(Option<HoodieIn
     }
 
     // Detect the commit gaps if any from the data and the metadata active 
timeline
-    if 
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
-        latestMetadataInstant.get().getTimestamp())
+    if 
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstantTimestamp)
         && !isCommitRevertedByInFlightAction(actionMetadata, 
latestMetadataInstantTimestamp)) {
       LOG.error("Metadata Table will need to be re-initialized as un-synced 
instants have been archived."
-          + " latestMetadataInstant=" + 
latestMetadataInstant.get().getTimestamp()
+          + " latestMetadataInstant=" + latestMetadataInstantTimestamp

Review Comment:
   The new sync design is a two phase commit, so I am also thinking this 
scenario should not hit.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -933,52 +1041,80 @@ protected HoodieData<HoodieRecord> 
prepRecords(Map<MetadataPartitionType,
   }
 
   /**
-   *  Perform a compaction on the Metadata Table.
-   *
-   * Cases to be handled:
-   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
-   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
-   *
-   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
-   *      deltacommit.
+   * Optimize the metadata table by running compaction, clean and archive as 
required.
+   * <p>
+   * Don't perform optimization if there are inflight operations on the 
dataset. This is for two reasons:
+   * - The compaction will contain the correct data as all failed operations 
have been rolled back.
+   * - Clean/compaction etc. will have the highest timestamp on the MDT and we 
won't be adding new operations
+   * with smaller timestamps to metadata table (makes for easier debugging)
+   * <p>
+   * This adds the limitations that long-running async operations (clustering, 
etc.) may cause delay in such MDT
+   * optimizations. We will relax this after MDT code has been hardened.
    */
-  protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
-    // finish off any pending compactions if any from previous attempt.
-    writeClient.runAnyPendingCompactions();
-
-    String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()
-        .getDeltaCommitTimeline()
-        .filterCompletedInstants()
-        .lastInstant().orElseThrow(() -> new HoodieMetadataException("No 
completed deltacommit in metadata table"))
-        .getTimestamp();
-    // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
-    // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
-    // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
-    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
-    // any instants before that is already synced with metadata table.
-    // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
-    // instant before c4 is synced with metadata table.
-    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-        
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+  @Override
+  public void performTableServices(Option<String> inFlightInstantTimestamp) {
+    HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+    boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+    try {
+      BaseHoodieWriteClient writeClient = getWriteClient();
+      // Run any pending table services operations.
+      runPendingTableServicesOperations(writeClient);
+
+      // Check and run clean operations.
+      String latestDeltacommitTime = 
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+          .filterCompletedInstants()
+          .lastInstant().get()
+          .getTimestamp();
+      LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running clean operations.");

Review Comment:
   Comments are addressed as part of 
[PR-8684](https://github.com/apache/hudi/pull/8684) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to