This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2994724ef5a [HUDI-8188] Add validation for partition stats index in 
HoodieMetadataTableValidator (#11921)
2994724ef5a is described below

commit 2994724ef5a1c30d46f3b8c095a08cb34d37c163
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Sep 27 08:49:39 2024 +0530

    [HUDI-8188] Add validation for partition stats index in 
HoodieMetadataTableValidator (#11921)
---
 .../utilities/HoodieMetadataTableValidator.java    | 182 ++++++++++++++++++---
 .../TestHoodieMetadataTableValidator.java          |  88 ++++++++++
 2 files changed, 244 insertions(+), 26 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 6a0fd473248..672ba0163e3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -18,9 +18,13 @@
 
 package org.apache.hudi.utilities;
 
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.PartitionStatsIndexSupport;
 import org.apache.hudi.async.HoodieAsyncService;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieCommonConfig;
@@ -28,6 +32,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
@@ -39,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -61,6 +67,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -68,6 +75,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -81,6 +89,7 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
 import org.apache.spark.sql.functions;
@@ -91,6 +100,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -100,12 +110,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
+import scala.collection.JavaConverters;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
@@ -243,6 +255,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (cfg.validateAllColumnStats) {
       labelList.add("validate-all-column-stats");
     }
+    if (cfg.validatePartitionStats) {
+      labelList.add("validate-partition-stats");
+    }
     if (cfg.validateBloomFilters) {
       labelList.add("validate-bloom-filters");
     }
@@ -293,6 +308,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     @Parameter(names = {"--validate-all-column-stats"}, description = 
"Validate column stats for all columns in the schema", required = false)
     public boolean validateAllColumnStats = false;
 
+    @Parameter(names = {"--validate-partition-stats"}, description = "Validate 
partition stats for all columns in the schema", required = false)
+    public boolean validatePartitionStats = false;
+
     @Parameter(names = {"--validate-bloom-filters"}, description = "Validate 
bloom filters of base files", required = false)
     public boolean validateBloomFilters = false;
 
@@ -369,6 +387,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           + "   --validate-all-file-groups " + validateAllFileGroups + ", \n"
           + "   --validate-last-n-file-slices " + validateLastNFileSlices + ", 
\n"
           + "   --validate-all-column-stats " + validateAllColumnStats + ", \n"
+          + "   --validate-partition-stats " + validatePartitionStats + ", \n"
           + "   --validate-bloom-filters " + validateBloomFilters + ", \n"
           + "   --validate-record-index-count " + validateRecordIndexCount + 
", \n"
           + "   --validate-record-index-content " + validateRecordIndexContent 
+ ", \n"
@@ -405,6 +424,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           && Objects.equals(validateLatestBaseFiles, 
config.validateLatestBaseFiles)
           && Objects.equals(validateAllFileGroups, 
config.validateAllFileGroups)
           && Objects.equals(validateAllColumnStats, 
config.validateAllColumnStats)
+          && Objects.equals(validatePartitionStats, 
config.validatePartitionStats)
           && Objects.equals(validateBloomFilters, config.validateBloomFilters)
           && Objects.equals(validateRecordIndexCount, 
config.validateRecordIndexCount)
           && Objects.equals(validateRecordIndexContent, 
config.validateRecordIndexContent)
@@ -425,7 +445,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     @Override
     public int hashCode() {
       return Objects.hash(basePath, continuous, skipDataFilesForCleaning, 
validateLatestFileSlices,
-          validateLatestBaseFiles, validateAllFileGroups, 
validateAllColumnStats, validateBloomFilters,
+          validateLatestBaseFiles, validateAllFileGroups, 
validateAllColumnStats, validatePartitionStats, validateBloomFilters,
           validateRecordIndexCount, validateRecordIndexContent, 
numRecordIndexErrorSamples,
           viewStorageTypeForFSListing, viewStorageTypeForMetadata,
           minValidateIntervalSeconds, parallelism, recordIndexParallelism, 
ignoreFailed,
@@ -583,6 +603,20 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         result.add(Pair.of(false, e));
       }
 
+      try {
+        if (cfg.validatePartitionStats) {
+          validatePartitionStats(metadataTableBasedContext, 
finalBaseFilesForCleaning, allPartitions);
+          result.add(Pair.of(true, null));
+        }
+      } catch (Exception e) {
+        LOG.error(
+            "Metadata table validation failed due to HoodieValidationException 
in partition stats validation for table: {} ", cfg.basePath, e);
+        if (!cfg.ignoreFailed) {
+          throw e;
+        }
+        result.add(Pair.of(false, e));
+      }
+
       for (Pair<Boolean, ? extends Exception> res : result) {
         finalResult &= res.getKey();
         if (res.getKey().equals(false)) {
@@ -600,6 +634,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         LOG.warn("Metadata table validation failed ({}).", taskLabels);
         return false;
       }
+    } catch (HoodieValidationException e) {
+      throw e;
     } catch (Exception e) {
       LOG.warn("Error closing HoodieMetadataValidationContext, "
           + "ignoring the error as the validation is successful.", e);
@@ -914,6 +950,86 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column 
stats");
   }
 
+  private void validatePartitionStats(HoodieMetadataValidationContext 
metadataTableBasedContext, Set<String> baseDataFilesForCleaning, List<String> 
allPartitions) throws Exception {
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieData<HoodieMetadataColumnStats> partitionStatsUsingColStats = 
getPartitionStatsUsingColStats(metadataTableBasedContext,
+        baseDataFilesForCleaning, allPartitions, engineContext);
+
+    PartitionStatsIndexSupport partitionStatsIndexSupport = new 
PartitionStatsIndexSupport(engineContext.getSqlContext().sparkSession(),
+        
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
 metadataTableBasedContext.getMetadataConfig(),
+        metaClient, false);
+    HoodieData<HoodieMetadataColumnStats> partitionStats =
+        
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
 false);
+    JavaRDD<HoodieMetadataColumnStats> diffRDD = 
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
+    if (!diffRDD.isEmpty()) {
+      List<HoodieMetadataColumnStats> diff = diffRDD.collect();
+      Set<String> partitionPaths = 
diff.stream().map(HoodieMetadataColumnStats::getFileName).collect(Collectors.toSet());
+      StringBuilder statDiffMsg = new StringBuilder();
+      for (String partitionPath : partitionPaths) {
+        List<HoodieMetadataColumnStats> diffPartitionStatsColStats = 
partitionStatsUsingColStats.filter(stat -> 
stat.getFileName().equals(partitionPath)).collectAsList();
+        List<HoodieMetadataColumnStats> diffPartitionStats = 
partitionStats.filter(stat -> 
stat.getFileName().equals(partitionPath)).collectAsList();
+        statDiffMsg.append(String.format("Partition stats from MDT: %s from 
colstats: %s", Arrays.toString(diffPartitionStats.toArray()), 
Arrays.toString(diffPartitionStatsColStats.toArray())));
+      }
+      throw new HoodieValidationException(String.format("Partition stats 
validation failed diff: %s", statDiffMsg));
+    }
+  }
+
+  private HoodieData<HoodieMetadataColumnStats> 
getPartitionStatsUsingColStats(HoodieMetadataValidationContext 
metadataTableBasedContext, Set<String> baseDataFilesForCleaning,
+                                                                               
List<String> allPartitions, HoodieSparkEngineContext engineContext) {
+    return engineContext.parallelize(allPartitions).flatMap(partitionPath -> {
+      List<FileSlice> latestFileSlicesFromMetadataTable = 
filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath),
+          baseDataFilesForCleaning);
+      List<String> latestFileNames = new ArrayList<>();
+      latestFileSlicesFromMetadataTable.stream().filter(fs -> 
fs.getBaseFile().isPresent()).forEach(fs -> {
+        latestFileNames.add(fs.getBaseFile().get().getFileName());
+        
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+      });
+      List<HoodieColumnRangeMetadata<Comparable>> colStats = 
metadataTableBasedContext
+          .getSortedColumnStatsList(partitionPath, latestFileNames);
+
+      TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats = 
aggregateColumnStats(partitionPath, colStats);
+      List<HoodieRecord> partitionStatRecords = 
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new 
ArrayList<>(aggregatedColumnStats), false)
+          .collect(Collectors.toList());
+      return partitionStatRecords.stream()
+          .map(record -> {
+            try {
+              return ((HoodieMetadataPayload) 
record.getData()).getInsertValue(null, null)
+                  .map(metadataRecord -> ((HoodieMetadataRecord) 
metadataRecord).getColumnStatsMetadata());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .filter(Option::isPresent)
+          .map(Option::get)
+          .collect(Collectors.toList())
+          .iterator();
+    });
+  }
+
+  /**
+   * Generates aggregated column stats which also signify as partition stat 
for the particular partition
+   * path.
+   *
+   * @param partitionPath Provided partition path
+   * @param colStats Column stat records for the partition
+   */
+  static TreeSet<HoodieColumnRangeMetadata<Comparable>> 
aggregateColumnStats(String partitionPath, 
List<HoodieColumnRangeMetadata<Comparable>> colStats) {
+    TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats = new 
TreeSet<>(Comparator.comparing(HoodieColumnRangeMetadata::getColumnName));
+    for (HoodieColumnRangeMetadata<Comparable> colStat : colStats) {
+      HoodieColumnRangeMetadata<Comparable> partitionStat = 
HoodieColumnRangeMetadata.create(partitionPath, colStat.getColumnName(),
+          colStat.getMinValue(), colStat.getMaxValue(), 
colStat.getNullCount(), colStat.getValueCount(), colStat.getTotalSize(), 
colStat.getTotalUncompressedSize());
+      HoodieColumnRangeMetadata<Comparable> storedPartitionStat = 
aggregatedColumnStats.floor(partitionStat);
+      if (storedPartitionStat == null || 
!storedPartitionStat.getColumnName().equals(partitionStat.getColumnName())) {
+        aggregatedColumnStats.add(partitionStat);
+        continue;
+      }
+      aggregatedColumnStats.remove(partitionStat);
+      
aggregatedColumnStats.add(HoodieColumnRangeMetadata.merge(storedPartitionStat, 
partitionStat));
+    }
+    return aggregatedColumnStats;
+  }
+
   private void validateBloomFilters(
       HoodieMetadataValidationContext metadataTableBasedContext,
       HoodieMetadataValidationContext fsBasedContext,
@@ -1410,6 +1526,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     private final Properties props;
     private final HoodieTableMetaClient metaClient;
+    private final HoodieMetadataConfig metadataConfig;
+    private final Schema schema;
     private final HoodieTableFileSystemView fileSystemView;
     private final HoodieTableMetadata tableMetadata;
     private final boolean enableMetadataTable;
@@ -1418,26 +1536,31 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     public HoodieMetadataValidationContext(
         HoodieEngineContext engineContext, Properties props, 
HoodieTableMetaClient metaClient,
         boolean enableMetadataTable, String viewStorageType) {
-      this.props = new Properties();
-      this.props.putAll(props);
-      this.metaClient = metaClient;
-      this.enableMetadataTable = enableMetadataTable;
-      HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
-          .enable(enableMetadataTable)
-          .withMetadataIndexBloomFilter(enableMetadataTable)
-          .withMetadataIndexColumnStats(enableMetadataTable)
-          .withEnableRecordIndex(enableMetadataTable)
-          .build();
-      props.put(FileSystemViewStorageConfig.VIEW_TYPE.key(), viewStorageType);
-      FileSystemViewStorageConfig viewConf = 
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
-      
ValidationUtils.checkArgument(viewConf.getStorageType().name().equals(viewStorageType),
 "View storage type not reflected");
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(props).build();
-      this.fileSystemView = getFileSystemView(engineContext,
-          metaClient, metadataConfig, viewConf, commonConfig);
-      this.tableMetadata = HoodieTableMetadata.create(
-          engineContext, metaClient.getStorage(), metadataConfig, 
metaClient.getBasePath().toString());
-      if 
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) 
{
-        this.allColumnNameList = getAllColumnNames();
+      try {
+        this.props = new Properties();
+        this.props.putAll(props);
+        this.metaClient = metaClient;
+        this.schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+        this.enableMetadataTable = enableMetadataTable;
+        this.metadataConfig = HoodieMetadataConfig.newBuilder()
+            .enable(enableMetadataTable)
+            .withMetadataIndexBloomFilter(enableMetadataTable)
+            .withMetadataIndexColumnStats(enableMetadataTable)
+            .withEnableRecordIndex(enableMetadataTable)
+            .build();
+        props.put(FileSystemViewStorageConfig.VIEW_TYPE.key(), 
viewStorageType);
+        FileSystemViewStorageConfig viewConf = 
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
+        
ValidationUtils.checkArgument(viewConf.getStorageType().name().equals(viewStorageType),
 "View storage type not reflected");
+        HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(props).build();
+        this.fileSystemView = getFileSystemView(engineContext,
+            metaClient, metadataConfig, viewConf, commonConfig);
+        this.tableMetadata = HoodieTableMetadata.create(
+            engineContext, metaClient.getStorage(), metadataConfig, 
metaClient.getBasePath().toString());
+        if 
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) 
{
+          this.allColumnNameList = getAllColumnNames();
+        }
+      } catch (Exception e) {
+        throw new HoodieException("Failed to initialize metadata validation 
context for " + metaClient.getBasePath());
       }
     }
 
@@ -1462,6 +1585,14 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       return metaClient;
     }
 
+    public HoodieMetadataConfig getMetadataConfig() {
+      return metadataConfig;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+
     public HoodieTableMetadata getTableMetadata() {
       return tableMetadata;
     }
@@ -1483,10 +1614,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public List<HoodieColumnRangeMetadata<Comparable>> 
getSortedColumnStatsList(
-        String partitionPath, List<String> baseFileNameList) {
+        String partitionPath, List<String> fileNames) {
       LOG.info("All column names for getting column stats: {}", 
allColumnNameList);
       if (enableMetadataTable) {
-        List<Pair<String, String>> partitionFileNameList = 
baseFileNameList.stream()
+        List<Pair<String, String>> partitionFileNameList = fileNames.stream()
             .map(filename -> Pair.of(partitionPath, 
filename)).collect(Collectors.toList());
         return allColumnNameList.stream()
             .flatMap(columnName ->
@@ -1499,7 +1630,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       } else {
         FileFormatUtils formatUtils = 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
             .getFileFormatUtils(HoodieFileFormat.PARQUET);
-        return baseFileNameList.stream().flatMap(filename ->
+        return fileNames.stream().flatMap(filename ->
                 formatUtils.readColumnStatsFromMetadata(
                         metaClient.getStorage(),
                         new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath), filename),
@@ -1533,9 +1664,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
 
     private List<String> getAllColumnNames() {
-      TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
       try {
-        return schemaResolver.getTableAvroSchema().getFields().stream()
+        return schema.getFields().stream()
             .map(Schema.Field::name).collect(Collectors.toList());
       } catch (Exception e) {
         throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePath());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 2181d7a8f12..5e0d69899a1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -61,10 +62,12 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -97,6 +100,47 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     );
   }
 
+  @Test
+  public void testAggregateColumnStats() {
+    HoodieColumnRangeMetadata<Comparable> fileColumn1Range1 = 
HoodieColumnRangeMetadata.<Comparable>create(
+        "path/to/file1", "col1", 1, 5, 0, 10, 100, 200);
+    HoodieColumnRangeMetadata<Comparable> fileColumn1Range2 = 
HoodieColumnRangeMetadata.<Comparable>create(
+        "path/to/file1", "col1", 1, 10, 5, 10, 100, 200);
+    HoodieColumnRangeMetadata<Comparable> fileColumn2Range1 = 
HoodieColumnRangeMetadata.<Comparable>create(
+        "path/to/file1", "col2", 3, 8, 1, 15, 120, 250);
+    HoodieColumnRangeMetadata<Comparable> fileColumn2Range2 = 
HoodieColumnRangeMetadata.<Comparable>create(
+        "path/to/file1", "col2", 5, 9, 4, 5, 80, 150);
+    List<HoodieColumnRangeMetadata<Comparable>> colStats = new ArrayList<>();
+    colStats.add(fileColumn1Range1);
+    colStats.add(fileColumn1Range2);
+    colStats.add(fileColumn2Range1);
+    colStats.add(fileColumn2Range2);
+
+    int col1Count = 0;
+    int col2Count = 0;
+    // Ensure merge logic for column stats is correct and aggregate logic 
creates two entries for two columns
+    TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedStats = 
HoodieMetadataTableValidator.aggregateColumnStats("path/to/file1", colStats);
+    assertEquals(2, aggregatedStats.size());
+    for (HoodieColumnRangeMetadata<Comparable> stat : aggregatedStats) {
+      if (stat.getColumnName().equals("col1")) {
+        assertEquals(1, stat.getMinValue());
+        assertEquals(10, stat.getMaxValue());
+        col1Count++;
+      } else if (stat.getColumnName().equals("col2")) {
+        assertEquals(3, stat.getMinValue());
+        assertEquals(9, stat.getMaxValue());
+        col2Count++;
+      }
+
+      assertEquals(5, stat.getNullCount());
+      assertEquals(20, stat.getValueCount());
+      assertEquals(200, stat.getTotalSize());
+      assertEquals(400, stat.getTotalUncompressedSize());
+    }
+    assertEquals(1, col1Count);
+    assertEquals(1, col2Count);
+  }
+
   @ParameterizedTest
   @MethodSource("viewStorageArgs")
   public void testMetadataTableValidation(String viewStorageTypeForFSListing, 
String viewStorageTypeForMDTListing) {
@@ -139,6 +183,50 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     assertTrue(validator.getThrowables().isEmpty());
   }
 
+  @Test
+  public void testPartitionStatsValidation() {
+    // TODO: Add validation for compaction and clustering cases
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");
+    
writeOptions.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"true");
+
+    Dataset<Row> inserts = makeInsertDf("000", 5);
+    inserts.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), 
"true")
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+    // validate MDT partition stats
+    validatePartitionStats();
+
+    Dataset<Row> updates = makeUpdateDf("001", 5);
+    updates.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
+        
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), 
"true")
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    // validate MDT partition stats
+    validatePartitionStats();
+  }
+
+  private void validatePartitionStats() {
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = basePath;
+    config.validateLatestFileSlices = false;
+    config.validateAllFileGroups = false;
+    config.validatePartitionStats = true;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    assertTrue(validator.run());
+    assertFalse(validator.hasValidationFailure());
+    assertTrue(validator.getThrowables().isEmpty());
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws 
InterruptedException {

Reply via email to