This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch partition-bucket-index
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d0b325739be34f26ca2cec65bc06dc3e1c60e173
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Mar 18 02:40:15 2025 +0530

    [HUDI-8345] Delete partition stats index for a partition that is deleted 
(#12953)
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 46 ++++++++++++++++------
 .../hudi/common/testutils/HoodieTestTable.java     |  1 +
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 37 ++++++++++++++++-
 3 files changed, 72 insertions(+), 12 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 1a462b5965f..c7f33550262 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -65,6 +65,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -410,8 +411,9 @@ public class HoodieTableMetadataUtil {
       
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
           "Column stats partition must be enabled to generate partition stats. 
Please enable: " + 
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
       // Generate Hoodie Pair data of partition name and list of column range 
metadata for all the files in that partition
+      boolean isDeletePartition = 
commitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION);
       final HoodieData<HoodieRecord> partitionStatsRDD = 
convertMetadataToPartitionStatRecords(commitMetadata, context,
-          dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt);
+          dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt, 
isDeletePartition);
       
partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
 partitionStatsRDD);
     }
     if 
(enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))
 {
@@ -2609,16 +2611,7 @@ public class HoodieTableMetadataUtil {
 
   public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatRecords(HoodieCommitMetadata commitMetadata, 
HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,
                                                                                
HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig,
-                                                                               
Option<HoodieRecordType> recordTypeOpt) {
-    // In this function we fetch column range metadata for all new files part 
of commit metadata along with all the other files
-    // of the affected partitions. The column range metadata is grouped by 
partition name to generate HoodiePairData of partition name
-    // and list of column range metadata for that partition files. This pair 
data is then used to generate partition stat records.
-    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    if (allWriteStats.isEmpty()) {
-      return engineContext.emptyHoodieData();
-    }
-
+                                                                               
Option<HoodieRecordType> recordTypeOpt, boolean isDeletePartition) {
     try {
       Option<Schema> writerSchema =
           
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
@@ -2628,11 +2621,42 @@ public class HoodieTableMetadataUtil {
                       : Option.of(new Schema.Parser().parse(writerSchemaStr)));
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Option<Schema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+      if (tableSchema.isEmpty()) {
+        return engineContext.emptyHoodieData();
+      }
       Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
       Map<String, Schema> columnsToIndexSchemaMap = 
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, 
writerSchemaOpt, false, recordTypeOpt);
       if (columnsToIndexSchemaMap.isEmpty()) {
         return engineContext.emptyHoodieData();
       }
+
+      // if this is DELETE_PARTITION, then create delete metadata payload for 
all columns for partition_stats
+      if (isDeletePartition) {
+        HoodieReplaceCommitMetadata replaceCommitMetadata = 
(HoodieReplaceCommitMetadata) commitMetadata;
+        Map<String, List<String>> partitionToReplaceFileIds = 
replaceCommitMetadata.getPartitionToReplaceFileIds();
+        List<String> partitionsToDelete = new 
ArrayList<>(partitionToReplaceFileIds.keySet());
+        if (partitionToReplaceFileIds.isEmpty()) {
+          return engineContext.emptyHoodieData();
+        }
+        return engineContext.parallelize(partitionsToDelete, 
partitionsToDelete.size()).flatMap(partition -> {
+          Stream<HoodieRecord> columnRangeMetadata = 
columnsToIndexSchemaMap.keySet().stream()
+              .flatMap(column -> 
HoodieMetadataPayload.createPartitionStatsRecords(
+                  partition,
+                  Collections.singletonList(HoodieColumnRangeMetadata.stub("", 
column)),
+                  true, true, Option.empty()));
+          return columnRangeMetadata.iterator();
+        });
+      }
+
+      // In this function we fetch column range metadata for all new files 
part of commit metadata along with all the other files
+      // of the affected partitions. The column range metadata is grouped by 
partition name to generate HoodiePairData of partition name
+      // and list of column range metadata for that partition files. This pair 
data is then used to generate partition stat records.
+      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(Collection::stream).collect(Collectors.toList());
+      if (allWriteStats.isEmpty()) {
+        return engineContext.emptyHoodieData();
+      }
+
       List<String> colsToIndex = new 
ArrayList<>(columnsToIndexSchemaMap.keySet());
       LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndexSchemaMap.keySet());
       // Group by partitionPath and then gather write stats lists,
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index c51273a617e..e64f24e6d54 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -659,6 +659,7 @@ public class HoodieTestTable implements AutoCloseable {
         .setInputGroups(clusteringGroups).build());
 
     HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
+    replaceMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
HoodieTestTable.PHONY_TABLE_SCHEMA);
     replacedFileIds.forEach(replacedFileId -> 
replaceMetadata.addReplaceFileId(partition, replacedFileId));
     replaceMetadata.setOperationType(operationType);
     if (newFileId.isPresent() && !StringUtils.isNullOrEmpty(newFileId.get())) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 32c6df0ba92..ec9cf372912 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -852,6 +852,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
   @ValueSource(booleans = Array(true, false))
   def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = {
     var (df1, fooTableModifier) = deletePartitionSetup()
+    validateDataAndPartitionStats(df1)
     var recordsToDelete = spark.emptyDataFrame
     if (usePartitionsToDeleteConfig) {
       fooTableModifier = 
fooTableModifier.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(),
@@ -867,13 +868,47 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
 
     fooTableModifier = 
fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.DELETE_PARTITION.name())
     HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
recordsToDelete)
-    val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
+    validateDataAndPartitionStats(recordsToDelete, isDeletePartition = true)
+    val snapshotDF3 = spark.read.format("hudi").load(tempBasePath)
     assertEquals(0, snapshotDF3.filter(entry => {
       val partitionPath = entry.getString(3)
       
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
     }).count())
   }
 
+  private def validateDataAndPartitionStats(inputDf: DataFrame = 
spark.emptyDataFrame, isDeletePartition: Boolean = false): Unit = {
+    val metaClient = createMetaClient(spark, tempBasePath)
+    val partitionStatsIndex = new PartitionStatsIndexSupport(
+      spark,
+      inputDf.schema,
+      
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(),
+      metaClient)
+    val partitionStats = 
partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "ts"), 
shouldReadInMemory = true).collectAsList()
+    partitionStats.forEach(stat => {
+      assertTrue(stat.getColumnName.equals("partition") || 
stat.getColumnName.equals("ts"))
+    })
+    if (isDeletePartition) {
+      assertEquals(2, partitionStats.size())
+      // validate that each stat record has only DEFAULT_THIRD_PARTITION_PATH 
because the other two partitions were deleted
+      partitionStats.forEach(stat => {
+        assertEquals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, 
stat.getFileName)
+      })
+    } else {
+      // 3 partitions * 2 columns = 6 records
+      assertEquals(6, partitionStats.size())
+      partitionStats.forEach(stat => {
+        
assertTrue(stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
 ||
+          
stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) 
||
+          
stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH))
+      })
+      // validate that there 2 records for each partition
+      val partitionStatsGrouped = partitionStats.asScala.groupBy(_.getFileName)
+      partitionStatsGrouped.foreach { case (_, stats) =>
+        assertEquals(2, stats.size)
+      }
+    }
+  }
+
   /**
    * Test case for deletion of partitions using wildcards
    * @param partition the name of the partition(s) to delete

Reply via email to