This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d0b325739be34f26ca2cec65bc06dc3e1c60e173 Author: Sagar Sumit <[email protected]> AuthorDate: Tue Mar 18 02:40:15 2025 +0530 [HUDI-8345] Delete partition stats index for a partition that is deleted (#12953) --- .../hudi/metadata/HoodieTableMetadataUtil.java | 46 ++++++++++++++++------ .../hudi/common/testutils/HoodieTestTable.java | 1 + .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 37 ++++++++++++++++- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 1a462b5965f..c7f33550262 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -65,6 +65,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -410,8 +411,9 @@ public class HoodieTableMetadataUtil { checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient), "Column stats partition must be enabled to generate partition stats. Please enable: " + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); // Generate Hoodie Pair data of partition name and list of column range metadata for all the files in that partition + boolean isDeletePartition = commitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION); final HoodieData<HoodieRecord> partitionStatsRDD = convertMetadataToPartitionStatRecords(commitMetadata, context, - dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt); + dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt, isDeletePartition); partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), partitionStatsRDD); } if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())) { @@ -2609,16 +2611,7 @@ public class HoodieTableMetadataUtil { public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig, - Option<HoodieRecordType> recordTypeOpt) { - // In this function we fetch column range metadata for all new files part of commit metadata along with all the other files - // of the affected partitions. The column range metadata is grouped by partition name to generate HoodiePairData of partition name - // and list of column range metadata for that partition files. This pair data is then used to generate partition stat records. - List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(Collection::stream).collect(Collectors.toList()); - if (allWriteStats.isEmpty()) { - return engineContext.emptyHoodieData(); - } - + Option<HoodieRecordType> recordTypeOpt, boolean isDeletePartition) { try { Option<Schema> writerSchema = Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) @@ -2628,11 +2621,42 @@ public class HoodieTableMetadataUtil { : Option.of(new Schema.Parser().parse(writerSchemaStr))); HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); Option<Schema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); + if (tableSchema.isEmpty()) { + return engineContext.emptyHoodieData(); + } Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema); Map<String, Schema> columnsToIndexSchemaMap = getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, writerSchemaOpt, false, recordTypeOpt); if (columnsToIndexSchemaMap.isEmpty()) { return engineContext.emptyHoodieData(); } + + // if this is DELETE_PARTITION, then create delete metadata payload for all columns for partition_stats + if (isDeletePartition) { + HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) commitMetadata; + Map<String, List<String>> partitionToReplaceFileIds = replaceCommitMetadata.getPartitionToReplaceFileIds(); + List<String> partitionsToDelete = new ArrayList<>(partitionToReplaceFileIds.keySet()); + if (partitionToReplaceFileIds.isEmpty()) { + return engineContext.emptyHoodieData(); + } + return engineContext.parallelize(partitionsToDelete, partitionsToDelete.size()).flatMap(partition -> { + Stream<HoodieRecord> columnRangeMetadata = columnsToIndexSchemaMap.keySet().stream() + .flatMap(column -> HoodieMetadataPayload.createPartitionStatsRecords( + partition, + Collections.singletonList(HoodieColumnRangeMetadata.stub("", column)), + true, true, Option.empty())); + return columnRangeMetadata.iterator(); + }); + } + + // In this function we fetch column range metadata for all new files part of commit metadata along with all the other files + // of the affected partitions. The column range metadata is grouped by partition name to generate HoodiePairData of partition name + // and list of column range metadata for that partition files. This pair data is then used to generate partition stat records. + List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + if (allWriteStats.isEmpty()) { + return engineContext.emptyHoodieData(); + } + List<String> colsToIndex = new ArrayList<>(columnsToIndexSchemaMap.keySet()); LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndexSchemaMap.keySet()); // Group by partitionPath and then gather write stats lists, diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index c51273a617e..e64f24e6d54 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -659,6 +659,7 @@ public class HoodieTestTable implements AutoCloseable { .setInputGroups(clusteringGroups).build()); HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + replaceMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); replacedFileIds.forEach(replacedFileId -> replaceMetadata.addReplaceFileId(partition, replacedFileId)); replaceMetadata.setOperationType(operationType); if (newFileId.isPresent() && !StringUtils.isNullOrEmpty(newFileId.get())) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 32c6df0ba92..ec9cf372912 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -852,6 +852,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { @ValueSource(booleans = Array(true, false)) def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = { var (df1, fooTableModifier) = deletePartitionSetup() + validateDataAndPartitionStats(df1) var recordsToDelete = spark.emptyDataFrame if (usePartitionsToDeleteConfig) { fooTableModifier = fooTableModifier.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), @@ -867,13 +868,47 @@ def testBulkInsertForDropPartitionColumn(): Unit = { fooTableModifier = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, recordsToDelete) - val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath) + validateDataAndPartitionStats(recordsToDelete, isDeletePartition = true) + val snapshotDF3 = spark.read.format("hudi").load(tempBasePath) assertEquals(0, snapshotDF3.filter(entry => { val partitionPath = entry.getString(3) !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) }).count()) } + private def validateDataAndPartitionStats(inputDf: DataFrame = spark.emptyDataFrame, isDeletePartition: Boolean = false): Unit = { + val metaClient = createMetaClient(spark, tempBasePath) + val partitionStatsIndex = new PartitionStatsIndexSupport( + spark, + inputDf.schema, + HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(), + metaClient) + val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "ts"), shouldReadInMemory = true).collectAsList() + partitionStats.forEach(stat => { + assertTrue(stat.getColumnName.equals("partition") || stat.getColumnName.equals("ts")) + }) + if (isDeletePartition) { + assertEquals(2, partitionStats.size()) + // validate that each stat record has only DEFAULT_THIRD_PARTITION_PATH because the other two partitions were deleted + partitionStats.forEach(stat => { + assertEquals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, stat.getFileName) + }) + } else { + // 3 partitions * 2 columns = 6 records + assertEquals(6, partitionStats.size()) + partitionStats.forEach(stat => { + assertTrue(stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || + stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) || + stat.getFileName.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)) + }) + // validate that there 2 records for each partition + val partitionStatsGrouped = partitionStats.asScala.groupBy(_.getFileName) + partitionStatsGrouped.foreach { case (_, stats) => + assertEquals(2, stats.size) + } + } + } + /** * Test case for deletion of partitions using wildcards * @param partition the name of the partition(s) to delete
