This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c12bac435a8 [HUDI-7692] Extract metadata record type to 
MetadataPartitionType enum (#11597)
c12bac435a8 is described below

commit c12bac435a82c3c0dc69195f723b0c6335edc3a3
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Aug 6 19:36:44 2024 +0530

    [HUDI-7692] Extract metadata record type to MetadataPartitionType enum 
(#11597)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   2 +-
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   2 +-
 .../hudi/metadata/HoodieMetadataPayload.java       | 376 ++++-----------------
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 118 +++++++
 .../hudi/metadata/MetadataPartitionType.java       | 252 +++++++++++++-
 .../hudi/metadata/TestMetadataPartitionType.java   |  16 +-
 .../hudi/dml/TestHoodieTableValuedFunction.scala   |   7 +-
 9 files changed, 442 insertions(+), 337 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ffbffc104b1..4d8fec87556 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -229,7 +229,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
                                        Option<String> 
inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
-    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.getValidValues().length);
 
     try {
       boolean exists = metadataTableExists(dataMetaClient);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 9e51ef99ae4..a70f5e7edba 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1002,7 +1002,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * Deletes the metadata partition if the writer disables any metadata index.
    */
   public void deleteMetadataIndexIfNecessary() {
-    Stream.of(MetadataPartitionType.values()).forEach(partitionType -> {
+    Stream.of(MetadataPartitionType.getValidValues()).forEach(partitionType -> 
{
       if (shouldDeleteMetadataPartition(partitionType)) {
         try {
           LOG.info("Deleting metadata partition because it is disabled in 
writer: " + partitionType.name());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 4e3d98f52bb..22c9d9eeb61 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -544,10 +544,10 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
             .withMetadataIndexBloomFilter(true)
             .withEnableRecordIndex(true).build())
         .build();
-    for (MetadataPartitionType partitionType : MetadataPartitionType.values()) 
{
+    for (MetadataPartitionType partitionType : 
MetadataPartitionType.getValidValues()) {
       metaClient.getTableConfig().setMetadataPartitionState(metaClient, 
partitionType.getPartitionPath(), true);
     }
-    metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient, 
MetadataPartitionType.values());
+    metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient, 
MetadataPartitionType.getValidValues());
     String metadataTableBasePath = Paths.get(basePath, 
METADATA_TABLE_FOLDER_PATH).toString();
     HoodieTableMetaClient metadataTableMetaClient = 
HoodieTestUtils.init(metadataTableBasePath, MERGE_ON_READ);
     HoodieMetadataTestTable.of(metadataTableMetaClient)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 0ef17e9e233..da21749a9d8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -750,7 +750,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   public Map<String, String> stats() {
-    Set<String> allMetadataPartitionPaths = 
Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+    Set<String> allMetadataPartitionPaths = 
Arrays.stream(MetadataPartitionType.getValidValues()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
     return metrics.map(m -> m.getStats(true, metadataMetaClient, this, 
allMetadataPartitionPaths)).orElseGet(HashMap::new);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index bbdf6be5a65..e4a3420aeba 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -47,8 +47,6 @@ import org.apache.hudi.util.Lazy;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -56,7 +54,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,12 +63,16 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey;
 
 /**
  * MetadataTable records are persisted with the schema defined in 
HoodieMetadata.avsc.
@@ -98,19 +99,6 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
  * During compaction on the table, the deletions are merged with additions and 
hence records are pruned.
  */
 public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadataPayload> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataPayload.class);
-  /**
-   * Type of the record. This can be an enum in the schema but Avro1.8
-   * has a bug - <a 
href="https://issues.apache.org/jira/browse/AVRO-1810";>...</a>
-   */
-  private static final int METADATA_TYPE_PARTITION_LIST = 1;
-  private static final int METADATA_TYPE_FILE_LIST = 2;
-  private static final int METADATA_TYPE_COLUMN_STATS = 3;
-  private static final int METADATA_TYPE_BLOOM_FILTER = 4;
-  private static final int METADATA_TYPE_RECORD_INDEX = 5;
-  private static final int METADATA_TYPE_PARTITION_STATS = 6;
-  private static final int METADATA_TYPE_SECONDARY_INDEX = 7;
-
   /**
    * HoodieMetadata schema field ids
    */
@@ -125,11 +113,11 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   /**
    * HoodieMetadata bloom filter payload field ids
    */
-  private static final String FIELD_IS_DELETED = "isDeleted";
-  private static final String BLOOM_FILTER_FIELD_TYPE = "type";
-  private static final String BLOOM_FILTER_FIELD_TIMESTAMP = "timestamp";
-  private static final String BLOOM_FILTER_FIELD_BLOOM_FILTER = "bloomFilter";
-  private static final String BLOOM_FILTER_FIELD_IS_DELETED = FIELD_IS_DELETED;
+  public static final String FIELD_IS_DELETED = "isDeleted";
+  public static final String BLOOM_FILTER_FIELD_TYPE = "type";
+  public static final String BLOOM_FILTER_FIELD_TIMESTAMP = "timestamp";
+  public static final String BLOOM_FILTER_FIELD_BLOOM_FILTER = "bloomFilter";
+  public static final String BLOOM_FILTER_FIELD_IS_DELETED = FIELD_IS_DELETED;
 
   /**
    * HoodieMetadata column stats payload field ids
@@ -184,15 +172,15 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * <p>
    * You can find more details in HUDI-3834.
    */
-  private static final Lazy<HoodieMetadataColumnStats.Builder> 
METADATA_COLUMN_STATS_BUILDER_STUB = 
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
+  public static final Lazy<HoodieMetadataColumnStats.Builder> 
METADATA_COLUMN_STATS_BUILDER_STUB = 
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
   private static final HoodieMetadataFileInfo DELETE_FILE_METADATA = new 
HoodieMetadataFileInfo(0L, true);
-  private String key = null;
-  private int type = 0;
-  private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
-  private HoodieMetadataBloomFilter bloomFilterMetadata = null;
-  private HoodieMetadataColumnStats columnStatMetadata = null;
-  private HoodieRecordIndexInfo recordIndexMetadata;
-  private HoodieSecondaryIndexInfo secondaryIndexMetadata;
+  protected String key = null;
+  protected int type = 0;
+  protected Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
+  protected HoodieMetadataBloomFilter bloomFilterMetadata = null;
+  protected HoodieMetadataColumnStats columnStatMetadata = null;
+  protected HoodieRecordIndexInfo recordIndexMetadata;
+  protected HoodieSecondaryIndexInfo secondaryIndexMetadata;
   private boolean isDeletedRecord = false;
 
   public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable<?> 
orderingVal) {
@@ -210,97 +198,30 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
       //       in any (read-)projected schema
       key = record.get(KEY_FIELD_NAME).toString();
       type = (int) record.get(SCHEMA_FIELD_NAME_TYPE);
-
-      if (type == METADATA_TYPE_FILE_LIST || type == 
METADATA_TYPE_PARTITION_LIST) {
-        Map<String, HoodieMetadataFileInfo> metadata = 
getNestedFieldValue(record, SCHEMA_FIELD_NAME_METADATA);
-        if (metadata != null) {
-          filesystemMetadata = metadata;
-          filesystemMetadata.keySet().forEach(k -> {
-            GenericRecord v = filesystemMetadata.get(k);
-            filesystemMetadata.put(k, new HoodieMetadataFileInfo((Long) 
v.get("size"), (Boolean) v.get("isDeleted")));
-          });
-        }
-      } else if (type == METADATA_TYPE_BLOOM_FILTER) {
-        GenericRecord bloomFilterRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_BLOOM_FILTER);
-        // NOTE: Only legitimate reason for {@code BloomFilterMetadata} to not 
be present is when
-        //       it's not been read from the storage (ie it's not been a part 
of projected schema).
-        //       Otherwise, it has to be present or the record would be 
considered invalid
-        if (bloomFilterRecord == null) {
-          
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_BLOOM_FILTER) == null,
-              String.format("Valid %s record expected for type: %s", 
SCHEMA_FIELD_ID_BLOOM_FILTER, METADATA_TYPE_BLOOM_FILTER));
-        } else {
-          bloomFilterMetadata = new HoodieMetadataBloomFilter(
-              (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TYPE),
-              (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
-              (ByteBuffer) 
bloomFilterRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
-              (Boolean) bloomFilterRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
-          );
-        }
-      } else if (type == METADATA_TYPE_COLUMN_STATS || type == 
METADATA_TYPE_PARTITION_STATS) {
-        GenericRecord columnStatsRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_COLUMN_STATS);
-        // NOTE: Only legitimate reason for {@code ColumnStatsMetadata} to not 
be present is when
-        //       it's not been read from the storage (ie it's not been a part 
of projected schema).
-        //       Otherwise, it has to be present or the record would be 
considered invalid
-        if (columnStatsRecord == null) {
-          
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null,
-              String.format("Valid %s record expected for type: %s", 
SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS));
-        } else {
-          columnStatMetadata = 
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
-              .setFileName((String) 
columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
-              .setColumnName((String) 
columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
-              // AVRO-2377 1.9.2 Modified the type of 
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
-              // This causes Kryo to fail when deserializing a GenericRecord, 
See HUDI-5484.
-              // We should avoid using GenericRecord and convert GenericRecord 
into a serializable type.
-              
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-              
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
-              .setValueCount((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
-              .setNullCount((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
-              .setTotalSize((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
-              .setTotalUncompressedSize((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
-              .setIsDeleted((Boolean) 
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
-              .build();
-        }
-      } else if (type == METADATA_TYPE_RECORD_INDEX) {
-        GenericRecord recordIndexRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_RECORD_INDEX);
-        Object recordIndexPosition = 
recordIndexRecord.get(RECORD_INDEX_FIELD_POSITION);
-        recordIndexMetadata = new 
HoodieRecordIndexInfo(recordIndexRecord.get(RECORD_INDEX_FIELD_PARTITION).toString(),
-            
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_HIGH_BITS).toString()),
-            
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_LOW_BITS).toString()),
-            
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()),
-            recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(),
-            
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()),
-            
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString()),
-            recordIndexPosition != null ? 
Long.parseLong(recordIndexPosition.toString()) : null);
-      } else if (type == METADATA_TYPE_SECONDARY_INDEX) {
-        GenericRecord secondaryIndexRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_SECONDARY_INDEX);
-        checkState(secondaryIndexRecord != null, "Valid SecondaryIndexMetadata 
record expected for type: " + METADATA_TYPE_SECONDARY_INDEX);
-        secondaryIndexMetadata = new HoodieSecondaryIndexInfo(
-            
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(),
-            (Boolean) 
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED));
-      }
+      MetadataPartitionType.get(type).constructMetadataPayload(this, record);
     } else {
       this.isDeletedRecord = true;
     }
   }
 
-  private HoodieMetadataPayload(String key, int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
+  protected HoodieMetadataPayload(String key, int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
     this(key, type, filesystemMetadata, null, null, null, null);
   }
 
-  private HoodieMetadataPayload(String key, HoodieMetadataBloomFilter 
metadataBloomFilter) {
-    this(key, METADATA_TYPE_BLOOM_FILTER, null, metadataBloomFilter, null, 
null, null);
+  protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter 
metadataBloomFilter) {
+    this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, 
metadataBloomFilter, null, null, null);
   }
 
-  private HoodieMetadataPayload(String key, HoodieMetadataColumnStats 
columnStats) {
-    this(key, METADATA_TYPE_COLUMN_STATS, null, null, columnStats, null, null);
+  protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats 
columnStats) {
+    this(key, MetadataPartitionType.COLUMN_STATS.getRecordType(), null, null, 
columnStats, null, null);
   }
 
   private HoodieMetadataPayload(String key, HoodieRecordIndexInfo 
recordIndexMetadata) {
-    this(key, METADATA_TYPE_RECORD_INDEX, null, null, null, 
recordIndexMetadata, null);
+    this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, 
null, recordIndexMetadata, null);
   }
 
   private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo 
secondaryIndexMetadata) {
-    this(key, METADATA_TYPE_SECONDARY_INDEX, null, null, null, null, 
secondaryIndexMetadata);
+    this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, 
null, null, null, secondaryIndexMetadata);
   }
 
   protected HoodieMetadataPayload(String key, int type,
@@ -334,11 +255,10 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions, boolean isDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> 
fileInfo.put(HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition),
 new HoodieMetadataFileInfo(0L, isDeleted)));
+    partitions.forEach(partition -> 
fileInfo.put(getPartitionIdentifierForFilesPartition(partition), new 
HoodieMetadataFileInfo(0L, isDeleted)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
-        fileInfo);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.ALL_PARTITIONS.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), 
MetadataPartitionType.ALL_PARTITIONS.getRecordType(), fileInfo);
     return new HoodieAvroRecord<>(key, payload);
   }
 
@@ -352,7 +272,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
                                                                                
Map<String, Long> filesAdded,
                                                                                
List<String> filesDeleted) {
-    String partitionIdentifier = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
+    String partitionIdentifier = 
getPartitionIdentifierForFilesPartition(partition);
     int size = filesAdded.size() + filesDeleted.size();
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
     filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new 
HoodieMetadataFileInfo(fileSize, false)));
@@ -360,7 +280,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     filesDeleted.forEach(fileName -> fileInfo.put(fileName, 
DELETE_FILE_METADATA));
 
     HoodieKey key = new HoodieKey(partitionIdentifier, 
MetadataPartitionType.FILES.getPartitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), 
MetadataPartitionType.FILES.getRecordType(), fileInfo);
     return new HoodieAvroRecord<>(key, payload);
   }
 
@@ -409,54 +329,14 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     checkArgument(previousRecord.key.equals(key),
         "Cannot combine " + previousRecord.key + " with " + key + " as the 
keys differ");
 
-    switch (type) {
-      case METADATA_TYPE_PARTITION_LIST:
-      case METADATA_TYPE_FILE_LIST:
-        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFileSystemMetadata(previousRecord);
-        return new HoodieMetadataPayload(key, type, combinedFileInfo);
-      case METADATA_TYPE_BLOOM_FILTER:
-        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
-        return new HoodieMetadataPayload(key, combineBloomFilterMetadata);
-      case METADATA_TYPE_COLUMN_STATS:
-        return new HoodieMetadataPayload(key, 
combineColumnStatsMetadata(previousRecord));
-      case METADATA_TYPE_RECORD_INDEX:
-        // There is always a single mapping and the latest mapping is 
maintained.
-        // Mappings in record index can change in two scenarios:
-        // 1. A key deleted from dataset and then added again (new filedID)
-        // 2. A key moved to a different file due to clustering
-
-        // No need to merge with previous record index, always pick the latest 
payload.
-        return this;
-      case METADATA_TYPE_SECONDARY_INDEX:
-        // Secondary Index combine()/merge() always returns the current 
(*this*)
-        // record and discards the prevRecord. Based on the 'isDeleted' marker 
in the payload,
-        // the merger running on top takes the right action (discard current 
or retain current record).
-        return this;
-      default:
-        throw new HoodieMetadataException("Unknown type of 
HoodieMetadataPayload: " + type);
-    }
+    return 
MetadataPartitionType.get(type).combineMetadataPayloads(previousRecord, this);
   }
 
   private static String getBloomFilterRecordKey(String partitionName, String 
fileName) {
-    return new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
+    return new 
PartitionIndexID(getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
         .concat(new FileIndexID(fileName).asBase64EncodedString());
   }
 
-  private HoodieMetadataBloomFilter 
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
-    // Bloom filters are always additive. No need to merge with previous bloom 
filter
-    return this.bloomFilterMetadata;
-  }
-
-  private HoodieMetadataColumnStats 
combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) {
-    checkArgument(previousRecord.getColumnStatMetadata().isPresent());
-    checkArgument(getColumnStatMetadata().isPresent());
-
-    HoodieMetadataColumnStats previousColStatsRecord = 
previousRecord.getColumnStatMetadata().get();
-    HoodieMetadataColumnStats newColumnStatsRecord = 
getColumnStatMetadata().get();
-
-    return mergeColumnStatsRecords(previousColStatsRecord, 
newColumnStatsRecord);
-  }
-
   public static Option<HoodieRecord<HoodieMetadataPayload>> 
combineSecondaryIndexRecord(
       HoodieRecord<HoodieMetadataPayload> oldRecord,
       HoodieRecord<HoodieMetadataPayload> newRecord) {
@@ -555,66 +435,6 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
-
-    // First, add all files listed in the previous record
-    if (previousRecord.filesystemMetadata != null) {
-      combinedFileInfo.putAll(previousRecord.filesystemMetadata);
-    }
-
-    // Second, merge in the files listed in the new record
-    if (filesystemMetadata != null) {
-      validatePayload(type, filesystemMetadata);
-
-      filesystemMetadata.forEach((key, fileInfo) -> {
-        combinedFileInfo.merge(key, fileInfo,
-            // Combine previous record w/ the new one, new records taking 
precedence over
-            // the old one
-            //
-            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
-            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
-            //       listing as well as drop the tombstone itself.
-            //       However, if file is not present in the previous record we 
have to persist tombstone
-            //       record in the listing to make sure we carry forward 
information that this file
-            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
-            //          - First we merge records from all of the delta 
log-files
-            //          - Then we merge records from base-files with the delta 
ones (coming as a result
-            //          of the previous step)
-            (oldFileInfo, newFileInfo) -> {
-              // NOTE: We can’t assume that MT update records will be ordered 
the same way as actual
-              //       FS operations (since they are not atomic), therefore MT 
record merging should be a
-              //       _commutative_ & _associative_ operation (ie one that 
would work even in case records
-              //       will get re-ordered), which is
-              //          - Possible for file-sizes (since file-sizes will 
ever grow, we can simply
-              //          take max of the old and new records)
-              //          - Not possible for is-deleted flags*
-              //
-              //       *However, we’re assuming that the case of concurrent 
write and deletion of the same
-              //       file is _impossible_ -- it would only be possible with 
concurrent upsert and
-              //       rollback operation (affecting the same log-file), which 
is implausible, b/c either
-              //       of the following have to be true:
-              //          - We’re appending to failed log-file (then the other 
writer is trying to
-              //          rollback it concurrently, before it’s own write)
-              //          - Rollback (of completed instant) is running 
concurrently with append (meaning
-              //          that restore is running concurrently with a write, 
which is also nut supported
-              //          currently)
-              if (newFileInfo.getIsDeleted()) {
-                if (oldFileInfo.getIsDeleted()) {
-                  LOG.warn("A file is repeatedly deleted in the files 
partition of the metadata table: " + key);
-                  return newFileInfo;
-                }
-                return null;
-              }
-              return new HoodieMetadataFileInfo(
-                  Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false);
-            });
-      });
-    }
-
-    return combinedFileInfo;
-  }
-
   /**
    * Get bloom filter index key.
    *
@@ -649,8 +469,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * @return Column stats index key
    */
   public static String getColumnStatsIndexKey(String partitionName, 
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
-
-    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
+    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
     final FileIndexID fileIndexID = new FileIndexID(new 
StoragePath(columnRangeMetadata.getFilePath()).getName());
     final ColumnIndexID columnIndexID = new 
ColumnIndexID(columnRangeMetadata.getColumnName());
     return getColumnStatsIndexKey(partitionIndexID, fileIndexID, 
columnIndexID);
@@ -703,54 +522,6 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     });
   }
 
-  public static String getPartitionStatsIndexKey(String partitionPath, String 
columnName) {
-    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath));
-    final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
-    return 
columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static HoodieMetadataColumnStats 
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
-                                                                   
HoodieMetadataColumnStats newColumnStats) {
-    checkArgument(Objects.equals(prevColumnStats.getColumnName(), 
newColumnStats.getColumnName()));
-
-    // We're handling 2 cases in here
-    //  - New record is a tombstone: in this case it simply overwrites 
previous state
-    //  - Previous record is a tombstone: in that case new proper record would 
also
-    //    be simply overwriting previous state
-    if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) {
-      return newColumnStats;
-    }
-
-    Comparable minValue =
-        (Comparable) Stream.of(
-                (Comparable) 
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
-                (Comparable) 
unwrapAvroValueWrapper(newColumnStats.getMinValue()))
-        .filter(Objects::nonNull)
-        .min(Comparator.naturalOrder())
-        .orElse(null);
-
-    Comparable maxValue =
-        (Comparable) Stream.of(
-                (Comparable) 
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
-                (Comparable) 
unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
-        .filter(Objects::nonNull)
-        .max(Comparator.naturalOrder())
-        .orElse(null);
-
-    return 
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
-        .setFileName(newColumnStats.getFileName())
-        .setColumnName(newColumnStats.getColumnName())
-        .setMinValue(wrapValueIntoAvro(minValue))
-        .setMaxValue(wrapValueIntoAvro(maxValue))
-        .setValueCount(prevColumnStats.getValueCount() + 
newColumnStats.getValueCount())
-        .setNullCount(prevColumnStats.getNullCount() + 
newColumnStats.getNullCount())
-        .setTotalSize(prevColumnStats.getTotalSize() + 
newColumnStats.getTotalSize())
-        .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + 
newColumnStats.getTotalUncompressedSize())
-        .setIsDeleted(newColumnStats.getIsDeleted())
-        .build();
-  }
-
   /**
    * Create and return a {@code HoodieMetadataPayload} to insert or update an 
entry for the record index.
    * <p>
@@ -796,7 +567,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
               uuid.getMostSignificantBits(),
               uuid.getLeastSignificantBits(),
               fileIndex,
-              "",
+              EMPTY_STRING,
               instantTimeMillis,
               0,
               null));
@@ -821,9 +592,9 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * <p>
    * Each entry maps the secondary key of a single record in HUDI to its 
record (or primary) key
    *
-   * @param recordKey     Primary key of the record
-   * @param secondaryKey  Secondary key of the record
-   * @param isDeleted     true if this record is deleted
+   * @param recordKey    Primary key of the record
+   * @param secondaryKey Secondary key of the record
+   * @param isDeleted    true if this record is deleted
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
 
@@ -854,7 +625,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * If this is a record-level index entry, returns the file to which this is 
mapped.
    */
   public HoodieRecordGlobalLocation getRecordGlobalLocation() {
-    return 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexMetadata);
+    return getLocationFromRecordIndexInfo(recordIndexMetadata);
   }
 
   public boolean isDeleted() {
@@ -890,54 +661,29 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     sb.append(KEY_FIELD_NAME + "=").append(key).append(", ");
     sb.append(SCHEMA_FIELD_NAME_TYPE + "=").append(type).append(", ");
 
-    switch (type) {
-      case METADATA_TYPE_PARTITION_LIST:
-      case METADATA_TYPE_FILE_LIST:
-        sb.append("Files: {");
-        
sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(",
 ");
-        
sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(",
 ");
-        sb.append("}");
-        break;
-      case METADATA_TYPE_BLOOM_FILTER:
-        checkState(getBloomFilterMetadata().isPresent());
-        sb.append("BloomFilter: {");
-        sb.append("bloom size: 
").append(getBloomFilterMetadata().get().getBloomFilter().array().length).append(",
 ");
-        sb.append("timestamp: 
").append(getBloomFilterMetadata().get().getTimestamp()).append(", ");
-        sb.append("deleted: 
").append(getBloomFilterMetadata().get().getIsDeleted());
-        sb.append("}");
-        break;
-      case METADATA_TYPE_COLUMN_STATS:
-        checkState(getColumnStatMetadata().isPresent());
-        sb.append("ColStats: {");
-        sb.append(getColumnStatMetadata().get());
-        sb.append("}");
-        break;
-      case METADATA_TYPE_RECORD_INDEX:
-        sb.append("RecordIndex: {");
-        sb.append("location=").append(getRecordGlobalLocation());
-        sb.append("}");
-        break;
-      default:
-        break;
+    if (type == MetadataPartitionType.FILES.getRecordType() || type == 
MetadataPartitionType.ALL_PARTITIONS.getRecordType()) {
+      sb.append("Files: {");
+      
sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(",
 ");
+      
sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(",
 ");
+      sb.append("}");
+    } else if (type == MetadataPartitionType.BLOOM_FILTERS.getRecordType()) {
+      checkState(getBloomFilterMetadata().isPresent());
+      sb.append("BloomFilter: {");
+      sb.append("bloom size: 
").append(getBloomFilterMetadata().get().getBloomFilter().array().length).append(",
 ");
+      sb.append("timestamp: 
").append(getBloomFilterMetadata().get().getTimestamp()).append(", ");
+      sb.append("deleted: 
").append(getBloomFilterMetadata().get().getIsDeleted());
+      sb.append("}");
+    } else if (type == MetadataPartitionType.COLUMN_STATS.getRecordType()) {
+      checkState(getColumnStatMetadata().isPresent());
+      sb.append("ColStats: {");
+      sb.append(getColumnStatMetadata().get());
+      sb.append("}");
+    } else if (type == MetadataPartitionType.RECORD_INDEX.getRecordType()) {
+      sb.append("RecordIndex: {");
+      sb.append("location=").append(getRecordGlobalLocation());
+      sb.append("}");
     }
     sb.append('}');
     return sb.toString();
   }
-
-  private static void validatePayload(int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
-    if (type == METADATA_TYPE_FILE_LIST) {
-      filesystemMetadata.forEach((fileName, fileInfo) -> {
-        checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, 
"Existing files should have size > 0");
-      });
-    }
-  }
-
-  private static <T> T getNestedFieldValue(GenericRecord record, String 
fieldName) {
-    // NOTE: This routine is more lightweight than {@code 
HoodieAvroUtils.getNestedFieldVal}
-    if (record.getSchema().getField(fieldName) == null) {
-      return null;
-    }
-
-    return unsafeCast(record.get(fieldName));
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 817fc2f535f..cbfb42d104a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.DoubleWrapper;
 import org.apache.hudi.avro.model.FloatWrapper;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
 import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -79,6 +80,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Tuple3;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
@@ -118,6 +121,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
@@ -131,6 +135,7 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
@@ -140,6 +145,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_O
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_MISSING_FILEINDEX_FALLBACK;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
@@ -2141,6 +2147,118 @@ public class HoodieTableMetadataUtil {
     return getFileStatsRangeMetadata(writeStat.getPartitionPath(), 
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
   }
 
+  public static String getPartitionStatsIndexKey(String partitionPath, String 
columnName) {
+    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionPath));
+    final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+    return 
columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public static HoodieMetadataColumnStats 
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
+                                                                  
HoodieMetadataColumnStats newColumnStats) {
+    checkArgument(Objects.equals(prevColumnStats.getColumnName(), 
newColumnStats.getColumnName()));
+
+    // We're handling 2 cases in here
+    //  - New record is a tombstone: in this case it simply overwrites 
previous state
+    //  - Previous record is a tombstone: in that case new proper record would 
also
+    //    be simply overwriting previous state
+    if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) {
+      return newColumnStats;
+    }
+
+    Comparable minValue =
+        (Comparable) Stream.of(
+                (Comparable) 
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
+                (Comparable) 
unwrapAvroValueWrapper(newColumnStats.getMinValue()))
+            .filter(Objects::nonNull)
+            .min(Comparator.naturalOrder())
+            .orElse(null);
+
+    Comparable maxValue =
+        (Comparable) Stream.of(
+                (Comparable) 
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
+                (Comparable) 
unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
+            .filter(Objects::nonNull)
+            .max(Comparator.naturalOrder())
+            .orElse(null);
+
+    return 
HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get())
+        .setFileName(newColumnStats.getFileName())
+        .setColumnName(newColumnStats.getColumnName())
+        .setMinValue(wrapValueIntoAvro(minValue))
+        .setMaxValue(wrapValueIntoAvro(maxValue))
+        .setValueCount(prevColumnStats.getValueCount() + 
newColumnStats.getValueCount())
+        .setNullCount(prevColumnStats.getNullCount() + 
newColumnStats.getNullCount())
+        .setTotalSize(prevColumnStats.getTotalSize() + 
newColumnStats.getTotalSize())
+        .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + 
newColumnStats.getTotalUncompressedSize())
+        .setIsDeleted(newColumnStats.getIsDeleted())
+        .build();
+  }
+
+  public static Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload 
newer) {
+    Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+    // First, add all files listed in the previous record
+    if (older.filesystemMetadata != null) {
+      combinedFileInfo.putAll(older.filesystemMetadata);
+    }
+
+    // Second, merge in the files listed in the new record
+    if (newer.filesystemMetadata != null) {
+      validatePayload(newer.type, newer.filesystemMetadata);
+
+      newer.filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking 
precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we 
have to persist tombstone
+            //       record in the listing to make sure we carry forward 
information that this file
+            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
+            //          - First we merge records from all of the delta 
log-files
+            //          - Then we merge records from base-files with the delta 
ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) -> {
+              // NOTE: We can’t assume that MT update records will be ordered 
the same way as actual
+              //       FS operations (since they are not atomic), therefore MT 
record merging should be a
+              //       _commutative_ & _associative_ operation (ie one that 
would work even in case records
+              //       will get re-ordered), which is
+              //          - Possible for file-sizes (since file-sizes will 
ever grow, we can simply
+              //          take max of the old and new records)
+              //          - Not possible for is-deleted flags*
+              //
+              //       *However, we’re assuming that the case of concurrent 
write and deletion of the same
+              //       file is _impossible_ -- it would only be possible with 
concurrent upsert and
+              //       rollback operation (affecting the same log-file), which 
is implausible, b/c either
+              //       of the following have to be true:
+              //          - We’re appending to failed log-file (then the other 
writer is trying to
+              //          rollback it concurrently, before it’s own write)
+              //          - Rollback (of completed instant) is running 
concurrently with append (meaning
+              //          that restore is running concurrently with a write, 
which is also nut supported
+              //          currently)
+              if (newFileInfo.getIsDeleted()) {
+                if (oldFileInfo.getIsDeleted()) {
+                  LOG.warn("A file is repeatedly deleted in the files 
partition of the metadata table: {}", key);
+                  return newFileInfo;
+                }
+                return null;
+              }
+              return new HoodieMetadataFileInfo(
+                  Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false);
+            });
+      });
+    }
+    return combinedFileInfo;
+  }
+
+  private static void validatePayload(int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
+    if (type == MetadataPartitionType.FILES.getRecordType()) {
+      filesystemMetadata.forEach((fileName, fileInfo) -> 
checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files 
should have size > 0"));
+    }
+  }
+
   /**
    * A class which represents a directory and the files and directories inside 
it.
    * <p>
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index d5854d4ec6f..a6223150bcc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -18,15 +18,27 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
+import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
+import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
@@ -37,36 +49,135 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_BLOOM_FILTER;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_IS_DELETED;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_TIMESTAMP;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_TYPE;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_IS_DELETED;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_TOTAL_SIZE;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_HIGH_BITS;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_LOW_BITS;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILE_INDEX;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_INSTANT_TIME;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_PARTITION;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_POSITION;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_BLOOM_FILTER;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_RECORD_INDEX;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_NAME_METADATA;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_FIELD_IS_DELETED;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_FIELD_RECORD_KEY;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.combineFileSystemMetadata;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.mergeColumnStatsRecords;
 
 /**
  * Partition types for metadata table.
  */
 public enum MetadataPartitionType {
-  FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-") {
+  FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 2) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       return getBooleanWithAltKeys(writeConfig, ENABLE);
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      constructFilesMetadataPayload(payload, record);
+    }
+
+    @Override
+    public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+      return new HoodieMetadataPayload(newer.key, newer.type, 
combineFileSystemMetadata(older, newer));
+    }
   },
-  COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, 
"col-stats-") {
+  COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, 
"col-stats-", 3) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       return getBooleanWithAltKeys(writeConfig, 
ENABLE_METADATA_INDEX_COLUMN_STATS);
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      constructColumnStatsMetadataPayload(payload, record);
+    }
+
+    @Override
+    public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+      checkArgument(older.getColumnStatMetadata().isPresent());
+      checkArgument(newer.getColumnStatMetadata().isPresent());
+
+      HoodieMetadataColumnStats previousColStatsRecord = 
older.getColumnStatMetadata().get();
+      HoodieMetadataColumnStats newColumnStatsRecord = 
newer.getColumnStatMetadata().get();
+
+      return new HoodieMetadataPayload(newer.key, 
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord));
+    }
   },
-  BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, 
"bloom-filters-") {
+  BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, 
"bloom-filters-", 4) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       return getBooleanWithAltKeys(writeConfig, 
ENABLE_METADATA_INDEX_BLOOM_FILTER);
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      GenericRecord bloomFilterRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_BLOOM_FILTER);
+      // NOTE: Only legitimate reason for {@code BloomFilterMetadata} to not 
be present is when
+      //       it's not been read from the storage (ie it's not been a part of 
projected schema).
+      //       Otherwise, it has to be present or the record would be 
considered invalid
+      if (bloomFilterRecord == null) {
+        
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_BLOOM_FILTER) == null,
+            String.format("Valid %s record expected for type: %s", 
SCHEMA_FIELD_ID_BLOOM_FILTER, 
MetadataPartitionType.BLOOM_FILTERS.getRecordType()));
+      } else {
+        payload.bloomFilterMetadata = new HoodieMetadataBloomFilter(
+            (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TYPE),
+            (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
+            (ByteBuffer) 
bloomFilterRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
+            (Boolean) bloomFilterRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
+        );
+      }
+    }
+
+    @Override
+    public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+      // Bloom filters are always additive. No need to merge with previous 
bloom filter
+      return new HoodieMetadataPayload(newer.key, newer.bloomFilterMetadata);
+    }
   },
-  RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, 
"record-index-") {
+  RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, 
"record-index-", 5) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       return getBooleanWithAltKeys(writeConfig, RECORD_INDEX_ENABLE_PROP);
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      GenericRecord recordIndexRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_RECORD_INDEX);
+      Object recordIndexPosition = 
recordIndexRecord.get(RECORD_INDEX_FIELD_POSITION);
+      payload.recordIndexMetadata = new 
HoodieRecordIndexInfo(recordIndexRecord.get(RECORD_INDEX_FIELD_PARTITION).toString(),
+          
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_HIGH_BITS).toString()),
+          
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_LOW_BITS).toString()),
+          
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()),
+          recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(),
+          
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()),
+          
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString()),
+          recordIndexPosition != null ? 
Long.parseLong(recordIndexPosition.toString()) : null);
+    }
   },
-  
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
 "func-index-") {
+  
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
 "func-index-", -1) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       // Functional index is created via sql and not via write path.
@@ -83,7 +194,7 @@ public enum MetadataPartitionType {
       return false;
     }
   },
-  
SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX, 
"secondary-index-") {
+  
SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX, 
"secondary-index-", 7) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       // Secondary index is created via sql and not via write path.
@@ -99,18 +210,96 @@ public enum MetadataPartitionType {
       }
       return false;
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      GenericRecord secondaryIndexRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_SECONDARY_INDEX);
+      checkState(secondaryIndexRecord != null, "Valid SecondaryIndexMetadata 
record expected for type: " + 
MetadataPartitionType.SECONDARY_INDEX.getRecordType());
+      payload.secondaryIndexMetadata = new HoodieSecondaryIndexInfo(
+          
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(),
+          (Boolean) 
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED));
+    }
   },
-  PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, 
"partition-stats-") {
+  PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, 
"partition-stats-", 6) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
       return getBooleanWithAltKeys(writeConfig, 
ENABLE_METADATA_INDEX_PARTITION_STATS) && 
nonEmpty(getStringWithAltKeys(writeConfig, COLUMN_STATS_INDEX_FOR_COLUMNS, 
EMPTY_STRING));
     }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      constructColumnStatsMetadataPayload(payload, record);
+    }
+  },
+  // ALL_PARTITIONS is just another record type in FILES partition
+  ALL_PARTITIONS(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1) {
+    @Override
+    public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
+      return getBooleanWithAltKeys(writeConfig, ENABLE);
+    }
+
+    @Override
+    public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+      MetadataPartitionType.constructFilesMetadataPayload(payload, record);
+    }
+
+    @Override
+    public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+      return new HoodieMetadataPayload(newer.key, newer.type, 
combineFileSystemMetadata(older, newer));
+    }
   };
 
+  private static <T> T getNestedFieldValue(GenericRecord record, String 
fieldName) {
+    // NOTE: This routine is more lightweight than {@code 
HoodieAvroUtils.getNestedFieldVal}
+    if (record.getSchema().getField(fieldName) == null) {
+      return null;
+    }
+
+    return unsafeCast(record.get(fieldName));
+  }
+
+  private static void constructFilesMetadataPayload(HoodieMetadataPayload 
payload, GenericRecord record) {
+    Map<String, HoodieMetadataFileInfo> metadata = getNestedFieldValue(record, 
SCHEMA_FIELD_NAME_METADATA);
+    if (metadata != null) {
+      payload.filesystemMetadata = metadata;
+      payload.filesystemMetadata.keySet().forEach(k -> {
+        GenericRecord v = payload.filesystemMetadata.get(k);
+        payload.filesystemMetadata.put(k, new HoodieMetadataFileInfo((Long) 
v.get("size"), (Boolean) v.get("isDeleted")));
+      });
+    }
+  }
+
+  private static void 
constructColumnStatsMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+    GenericRecord columnStatsRecord = getNestedFieldValue(record, 
SCHEMA_FIELD_ID_COLUMN_STATS);
+    // NOTE: Only legitimate reason for {@code ColumnStatsMetadata} to not be 
present is when
+    //       it's not been read from the storage (ie it's not been a part of 
projected schema).
+    //       Otherwise, it has to be present or the record would be considered 
invalid
+    if (columnStatsRecord == null) {
+      checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) 
== null,
+          String.format("Valid %s record expected for type: %s", 
SCHEMA_FIELD_ID_COLUMN_STATS, 
MetadataPartitionType.COLUMN_STATS.getRecordType()));
+    } else {
+      payload.columnStatMetadata = 
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
+          .setFileName((String) 
columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
+          .setColumnName((String) 
columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
+          // AVRO-2377 1.9.2 Modified the type of 
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
+          // This causes Kryo to fail when deserializing a GenericRecord, See 
HUDI-5484.
+          // We should avoid using GenericRecord and convert GenericRecord 
into a serializable type.
+          
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+          
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+          .setValueCount((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
+          .setNullCount((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
+          .setTotalSize((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
+          .setTotalUncompressedSize((Long) 
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
+          .setIsDeleted((Boolean) 
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
+          .build();
+    }
+  }
+
   // Partition path in metadata table.
   private final String partitionPath;
   // FileId prefix used for all file groups in this partition.
   private final String fileIdPrefix;
+  private final int recordType;
 
   /**
    * Check if the metadata partition is enabled based on the metadata config.
@@ -124,9 +313,10 @@ public enum MetadataPartitionType {
     return metaClient.getTableConfig().isMetadataPartitionAvailable(this);
   }
 
-  MetadataPartitionType(final String partitionPath, final String fileIdPrefix) 
{
+  MetadataPartitionType(final String partitionPath, final String fileIdPrefix, 
final int recordType) {
     this.partitionPath = partitionPath;
     this.fileIdPrefix = fileIdPrefix;
+    this.recordType = recordType;
   }
 
   public String getPartitionPath() {
@@ -137,6 +327,37 @@ public enum MetadataPartitionType {
     return fileIdPrefix;
   }
 
+  public int getRecordType() {
+    return recordType;
+  }
+
+  /**
+   * Construct metadata payload from the given record.
+   */
+  public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
+    throw new UnsupportedOperationException("MetadataPayload construction not 
supported for partition type: " + this);
+  }
+
+  /**
+   * Merge old and new metadata payloads. By default, it returns the newer 
payload.
+   * Implementations can override this method to merge the payloads depending 
on the partition type.
+   */
+  public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+    return newer;
+  }
+
+  /**
+   * Get the metadata partition type for the given record type.
+   */
+  public static MetadataPartitionType get(int type) {
+    for (MetadataPartitionType partitionType : values()) {
+      if (partitionType.getRecordType() == type) {
+        return partitionType;
+      }
+    }
+    throw new IllegalArgumentException("No MetadataPartitionType for record 
type: " + type);
+  }
+
   /**
    * Returns the list of metadata table partitions which require WriteStatus 
to track written records.
    * <p>
@@ -150,22 +371,31 @@ public enum MetadataPartitionType {
    * Returns the set of all metadata partition names.
    */
   public static Set<String> getAllPartitionPaths() {
-    return Arrays.stream(values())
+    return Arrays.stream(getValidValues())
         .map(MetadataPartitionType::getPartitionPath)
         .collect(Collectors.toSet());
   }
 
+  /**
+   * Returns the set of all valid metadata partition types. Prefer using this 
method over {@link #values()}.
+   */
+  public static MetadataPartitionType[] getValidValues() {
+    // ALL_PARTITIONS is just another record type in FILES partition
+    return EnumSet.complementOf(EnumSet.of(
+        ALL_PARTITIONS)).toArray(new MetadataPartitionType[0]);
+  }
+
   /**
    * Returns the list of metadata partition types enabled based on the 
metadata config and table config.
    */
   public static List<MetadataPartitionType> 
getEnabledPartitions(TypedProperties writeConfig, HoodieTableMetaClient 
metaClient) {
-    return Arrays.stream(values())
+    return Arrays.stream(getValidValues())
         .filter(partitionType -> 
partitionType.isMetadataPartitionEnabled(writeConfig) || 
partitionType.isMetadataPartitionAvailable(metaClient))
         .collect(Collectors.toList());
   }
 
   public static MetadataPartitionType fromPartitionPath(String partitionPath) {
-    for (MetadataPartitionType partitionType : values()) {
+    for (MetadataPartitionType partitionType : getValidValues()) {
       if (partitionPath.equals(partitionType.getPartitionPath()) || 
partitionPath.startsWith(partitionType.getPartitionPath())) {
         return partitionType;
       }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
index 31a98d875b2..c214b23458c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
@@ -57,6 +57,7 @@ public class TestMetadataPartitionType {
     int expectedEnabledPartitions;
     switch (partitionType) {
       case FILES:
+      case ALL_PARTITIONS:
       case FUNCTIONAL_INDEX:
       case SECONDARY_INDEX:
         metadataConfigBuilder.enable(true);
@@ -86,11 +87,11 @@ public class TestMetadataPartitionType {
 
     // Verify partition type is enabled due to config
     if (partitionType == MetadataPartitionType.FUNCTIONAL_INDEX || 
partitionType == MetadataPartitionType.SECONDARY_INDEX) {
-      assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX should be 
enabled by SQL, only FILES is enabled in this case.");
+      assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX or 
SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case.");
       assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
     } else {
       assertEquals(expectedEnabledPartitions, enabledPartitions.size());
-      assertTrue(enabledPartitions.contains(partitionType));
+      assertTrue(enabledPartitions.contains(partitionType) || 
MetadataPartitionType.ALL_PARTITIONS.equals(partitionType));
     }
   }
 
@@ -171,4 +172,15 @@ public class TestMetadataPartitionType {
     assertEquals(MetadataPartitionType.PARTITION_STATS, 
MetadataPartitionType.fromPartitionPath("partition_stats"));
     assertThrows(IllegalArgumentException.class, () -> 
MetadataPartitionType.fromPartitionPath("unknown"));
   }
+
+  @Test
+  public void testGetMetadataPartitionRecordType() {
+    assertEquals(1, MetadataPartitionType.ALL_PARTITIONS.getRecordType());
+    assertEquals(2, MetadataPartitionType.FILES.getRecordType());
+    assertEquals(3, MetadataPartitionType.COLUMN_STATS.getRecordType());
+    assertEquals(4, MetadataPartitionType.BLOOM_FILTERS.getRecordType());
+    assertEquals(5, MetadataPartitionType.RECORD_INDEX.getRecordType());
+    assertEquals(6, MetadataPartitionType.PARTITION_STATS.getRecordType());
+    assertEquals(7, MetadataPartitionType.SECONDARY_INDEX.getRecordType());
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index 9090b1c8a5d..2409d783de3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
 import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.metadata.HoodieMetadataPayload.getPartitionStatsIndexKey
-
+import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
 import org.apache.spark.sql.functions.{col, from_json}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
@@ -243,8 +242,8 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
           val result1DF = spark.sql(s"select * from 
hudi_filesystem_view('$identifier', 'price*')")
           result1DF.show(false)
           val result1Array = result1DF.select(
-              col("Partition_Path")
-            ).orderBy("Partition_Path").take(10)
+            col("Partition_Path")
+          ).orderBy("Partition_Path").take(10)
           checkAnswer(result1Array)(
             Seq("price=10.0"),
             Seq("price=10.0"),

Reply via email to