This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2994724ef5a [HUDI-8188] Add validation for partition stats index in
HoodieMetadataTableValidator (#11921)
2994724ef5a is described below
commit 2994724ef5a1c30d46f3b8c095a08cb34d37c163
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Sep 27 08:49:39 2024 +0530
[HUDI-8188] Add validation for partition stats index in
HoodieMetadataTableValidator (#11921)
---
.../utilities/HoodieMetadataTableValidator.java | 182 ++++++++++++++++++---
.../TestHoodieMetadataTableValidator.java | 88 ++++++++++
2 files changed, 244 insertions(+), 26 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 6a0fd473248..672ba0163e3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -18,9 +18,13 @@
package org.apache.hudi.utilities;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.PartitionStatsIndexSupport;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieCommonConfig;
@@ -28,6 +32,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
@@ -39,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -61,6 +67,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
@@ -68,6 +75,7 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -81,6 +89,7 @@ import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.functions;
@@ -91,6 +100,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -100,12 +110,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import scala.Tuple2;
+import scala.collection.JavaConverters;
import static
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
@@ -243,6 +255,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (cfg.validateAllColumnStats) {
labelList.add("validate-all-column-stats");
}
+ if (cfg.validatePartitionStats) {
+ labelList.add("validate-partition-stats");
+ }
if (cfg.validateBloomFilters) {
labelList.add("validate-bloom-filters");
}
@@ -293,6 +308,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
@Parameter(names = {"--validate-all-column-stats"}, description =
"Validate column stats for all columns in the schema", required = false)
public boolean validateAllColumnStats = false;
+ @Parameter(names = {"--validate-partition-stats"}, description = "Validate
partition stats for all columns in the schema", required = false)
+ public boolean validatePartitionStats = false;
+
@Parameter(names = {"--validate-bloom-filters"}, description = "Validate
bloom filters of base files", required = false)
public boolean validateBloomFilters = false;
@@ -369,6 +387,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
+ " --validate-all-file-groups " + validateAllFileGroups + ", \n"
+ " --validate-last-n-file-slices " + validateLastNFileSlices + ",
\n"
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
+ + " --validate-partition-stats " + validatePartitionStats + ", \n"
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
+ " --validate-record-index-count " + validateRecordIndexCount +
", \n"
+ " --validate-record-index-content " + validateRecordIndexContent
+ ", \n"
@@ -405,6 +424,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
&& Objects.equals(validateLatestBaseFiles,
config.validateLatestBaseFiles)
&& Objects.equals(validateAllFileGroups,
config.validateAllFileGroups)
&& Objects.equals(validateAllColumnStats,
config.validateAllColumnStats)
+ && Objects.equals(validatePartitionStats,
config.validatePartitionStats)
&& Objects.equals(validateBloomFilters, config.validateBloomFilters)
&& Objects.equals(validateRecordIndexCount,
config.validateRecordIndexCount)
&& Objects.equals(validateRecordIndexContent,
config.validateRecordIndexContent)
@@ -425,7 +445,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
@Override
public int hashCode() {
return Objects.hash(basePath, continuous, skipDataFilesForCleaning,
validateLatestFileSlices,
- validateLatestBaseFiles, validateAllFileGroups,
validateAllColumnStats, validateBloomFilters,
+ validateLatestBaseFiles, validateAllFileGroups,
validateAllColumnStats, validatePartitionStats, validateBloomFilters,
validateRecordIndexCount, validateRecordIndexContent,
numRecordIndexErrorSamples,
viewStorageTypeForFSListing, viewStorageTypeForMetadata,
minValidateIntervalSeconds, parallelism, recordIndexParallelism,
ignoreFailed,
@@ -583,6 +603,20 @@ public class HoodieMetadataTableValidator implements
Serializable {
result.add(Pair.of(false, e));
}
+ try {
+ if (cfg.validatePartitionStats) {
+ validatePartitionStats(metadataTableBasedContext,
finalBaseFilesForCleaning, allPartitions);
+ result.add(Pair.of(true, null));
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Metadata table validation failed due to HoodieValidationException
in partition stats validation for table: {} ", cfg.basePath, e);
+ if (!cfg.ignoreFailed) {
+ throw e;
+ }
+ result.add(Pair.of(false, e));
+ }
+
for (Pair<Boolean, ? extends Exception> res : result) {
finalResult &= res.getKey();
if (res.getKey().equals(false)) {
@@ -600,6 +634,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
LOG.warn("Metadata table validation failed ({}).", taskLabels);
return false;
}
+ } catch (HoodieValidationException e) {
+ throw e;
} catch (Exception e) {
LOG.warn("Error closing HoodieMetadataValidationContext, "
+ "ignoring the error as the validation is successful.", e);
@@ -914,6 +950,86 @@ public class HoodieMetadataTableValidator implements
Serializable {
validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column
stats");
}
+ private void validatePartitionStats(HoodieMetadataValidationContext
metadataTableBasedContext, Set<String> baseDataFilesForCleaning, List<String>
allPartitions) throws Exception {
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieData<HoodieMetadataColumnStats> partitionStatsUsingColStats =
getPartitionStatsUsingColStats(metadataTableBasedContext,
+ baseDataFilesForCleaning, allPartitions, engineContext);
+
+ PartitionStatsIndexSupport partitionStatsIndexSupport = new
PartitionStatsIndexSupport(engineContext.getSqlContext().sparkSession(),
+
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
metadataTableBasedContext.getMetadataConfig(),
+ metaClient, false);
+ HoodieData<HoodieMetadataColumnStats> partitionStats =
+
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
false);
+ JavaRDD<HoodieMetadataColumnStats> diffRDD =
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
+ if (!diffRDD.isEmpty()) {
+ List<HoodieMetadataColumnStats> diff = diffRDD.collect();
+ Set<String> partitionPaths =
diff.stream().map(HoodieMetadataColumnStats::getFileName).collect(Collectors.toSet());
+ StringBuilder statDiffMsg = new StringBuilder();
+ for (String partitionPath : partitionPaths) {
+ List<HoodieMetadataColumnStats> diffPartitionStatsColStats =
partitionStatsUsingColStats.filter(stat ->
stat.getFileName().equals(partitionPath)).collectAsList();
+ List<HoodieMetadataColumnStats> diffPartitionStats =
partitionStats.filter(stat ->
stat.getFileName().equals(partitionPath)).collectAsList();
+ statDiffMsg.append(String.format("Partition stats from MDT: %s from
colstats: %s", Arrays.toString(diffPartitionStats.toArray()),
Arrays.toString(diffPartitionStatsColStats.toArray())));
+ }
+ throw new HoodieValidationException(String.format("Partition stats
validation failed diff: %s", statDiffMsg));
+ }
+ }
+
+ private HoodieData<HoodieMetadataColumnStats>
getPartitionStatsUsingColStats(HoodieMetadataValidationContext
metadataTableBasedContext, Set<String> baseDataFilesForCleaning,
+
List<String> allPartitions, HoodieSparkEngineContext engineContext) {
+ return engineContext.parallelize(allPartitions).flatMap(partitionPath -> {
+ List<FileSlice> latestFileSlicesFromMetadataTable =
filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath),
+ baseDataFilesForCleaning);
+ List<String> latestFileNames = new ArrayList<>();
+ latestFileSlicesFromMetadataTable.stream().filter(fs ->
fs.getBaseFile().isPresent()).forEach(fs -> {
+ latestFileNames.add(fs.getBaseFile().get().getFileName());
+
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+ });
+ List<HoodieColumnRangeMetadata<Comparable>> colStats =
metadataTableBasedContext
+ .getSortedColumnStatsList(partitionPath, latestFileNames);
+
+ TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats =
aggregateColumnStats(partitionPath, colStats);
+ List<HoodieRecord> partitionStatRecords =
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new
ArrayList<>(aggregatedColumnStats), false)
+ .collect(Collectors.toList());
+ return partitionStatRecords.stream()
+ .map(record -> {
+ try {
+ return ((HoodieMetadataPayload)
record.getData()).getInsertValue(null, null)
+ .map(metadataRecord -> ((HoodieMetadataRecord)
metadataRecord).getColumnStatsMetadata());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .filter(Option::isPresent)
+ .map(Option::get)
+ .collect(Collectors.toList())
+ .iterator();
+ });
+ }
+
+ /**
+ * Generates aggregated column stats which also signify as partition stat
for the particular partition
+ * path.
+ *
+ * @param partitionPath Provided partition path
+ * @param colStats Column stat records for the partition
+ */
+ static TreeSet<HoodieColumnRangeMetadata<Comparable>>
aggregateColumnStats(String partitionPath,
List<HoodieColumnRangeMetadata<Comparable>> colStats) {
+ TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats = new
TreeSet<>(Comparator.comparing(HoodieColumnRangeMetadata::getColumnName));
+ for (HoodieColumnRangeMetadata<Comparable> colStat : colStats) {
+ HoodieColumnRangeMetadata<Comparable> partitionStat =
HoodieColumnRangeMetadata.create(partitionPath, colStat.getColumnName(),
+ colStat.getMinValue(), colStat.getMaxValue(),
colStat.getNullCount(), colStat.getValueCount(), colStat.getTotalSize(),
colStat.getTotalUncompressedSize());
+ HoodieColumnRangeMetadata<Comparable> storedPartitionStat =
aggregatedColumnStats.floor(partitionStat);
+ if (storedPartitionStat == null ||
!storedPartitionStat.getColumnName().equals(partitionStat.getColumnName())) {
+ aggregatedColumnStats.add(partitionStat);
+ continue;
+ }
+ aggregatedColumnStats.remove(partitionStat);
+
aggregatedColumnStats.add(HoodieColumnRangeMetadata.merge(storedPartitionStat,
partitionStat));
+ }
+ return aggregatedColumnStats;
+ }
+
private void validateBloomFilters(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext,
@@ -1410,6 +1526,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
private final Properties props;
private final HoodieTableMetaClient metaClient;
+ private final HoodieMetadataConfig metadataConfig;
+ private final Schema schema;
private final HoodieTableFileSystemView fileSystemView;
private final HoodieTableMetadata tableMetadata;
private final boolean enableMetadataTable;
@@ -1418,26 +1536,31 @@ public class HoodieMetadataTableValidator implements
Serializable {
public HoodieMetadataValidationContext(
HoodieEngineContext engineContext, Properties props,
HoodieTableMetaClient metaClient,
boolean enableMetadataTable, String viewStorageType) {
- this.props = new Properties();
- this.props.putAll(props);
- this.metaClient = metaClient;
- this.enableMetadataTable = enableMetadataTable;
- HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
- .enable(enableMetadataTable)
- .withMetadataIndexBloomFilter(enableMetadataTable)
- .withMetadataIndexColumnStats(enableMetadataTable)
- .withEnableRecordIndex(enableMetadataTable)
- .build();
- props.put(FileSystemViewStorageConfig.VIEW_TYPE.key(), viewStorageType);
- FileSystemViewStorageConfig viewConf =
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
-
ValidationUtils.checkArgument(viewConf.getStorageType().name().equals(viewStorageType),
"View storage type not reflected");
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(props).build();
- this.fileSystemView = getFileSystemView(engineContext,
- metaClient, metadataConfig, viewConf, commonConfig);
- this.tableMetadata = HoodieTableMetadata.create(
- engineContext, metaClient.getStorage(), metadataConfig,
metaClient.getBasePath().toString());
- if
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0)
{
- this.allColumnNameList = getAllColumnNames();
+ try {
+ this.props = new Properties();
+ this.props.putAll(props);
+ this.metaClient = metaClient;
+ this.schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+ this.enableMetadataTable = enableMetadataTable;
+ this.metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(enableMetadataTable)
+ .withMetadataIndexBloomFilter(enableMetadataTable)
+ .withMetadataIndexColumnStats(enableMetadataTable)
+ .withEnableRecordIndex(enableMetadataTable)
+ .build();
+ props.put(FileSystemViewStorageConfig.VIEW_TYPE.key(),
viewStorageType);
+ FileSystemViewStorageConfig viewConf =
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
+
ValidationUtils.checkArgument(viewConf.getStorageType().name().equals(viewStorageType),
"View storage type not reflected");
+ HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(props).build();
+ this.fileSystemView = getFileSystemView(engineContext,
+ metaClient, metadataConfig, viewConf, commonConfig);
+ this.tableMetadata = HoodieTableMetadata.create(
+ engineContext, metaClient.getStorage(), metadataConfig,
metaClient.getBasePath().toString());
+ if
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0)
{
+ this.allColumnNameList = getAllColumnNames();
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to initialize metadata validation
context for " + metaClient.getBasePath());
}
}
@@ -1462,6 +1585,14 @@ public class HoodieMetadataTableValidator implements
Serializable {
return metaClient;
}
+ public HoodieMetadataConfig getMetadataConfig() {
+ return metadataConfig;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
public HoodieTableMetadata getTableMetadata() {
return tableMetadata;
}
@@ -1483,10 +1614,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
@SuppressWarnings({"rawtypes", "unchecked"})
public List<HoodieColumnRangeMetadata<Comparable>>
getSortedColumnStatsList(
- String partitionPath, List<String> baseFileNameList) {
+ String partitionPath, List<String> fileNames) {
LOG.info("All column names for getting column stats: {}",
allColumnNameList);
if (enableMetadataTable) {
- List<Pair<String, String>> partitionFileNameList =
baseFileNameList.stream()
+ List<Pair<String, String>> partitionFileNameList = fileNames.stream()
.map(filename -> Pair.of(partitionPath,
filename)).collect(Collectors.toList());
return allColumnNameList.stream()
.flatMap(columnName ->
@@ -1499,7 +1630,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
} else {
FileFormatUtils formatUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET);
- return baseFileNameList.stream().flatMap(filename ->
+ return fileNames.stream().flatMap(filename ->
formatUtils.readColumnStatsFromMetadata(
metaClient.getStorage(),
new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath), filename),
@@ -1533,9 +1664,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
private List<String> getAllColumnNames() {
- TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
try {
- return schemaResolver.getTableAvroSchema().getFields().stream()
+ return schema.getFields().stream()
.map(Schema.Field::name).collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieException("Failed to get all column names for " +
metaClient.getBasePath());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 2181d7a8f12..5e0d69899a1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
@@ -61,10 +62,12 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -97,6 +100,47 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
);
}
+ @Test
+ public void testAggregateColumnStats() {
+ HoodieColumnRangeMetadata<Comparable> fileColumn1Range1 =
HoodieColumnRangeMetadata.<Comparable>create(
+ "path/to/file1", "col1", 1, 5, 0, 10, 100, 200);
+ HoodieColumnRangeMetadata<Comparable> fileColumn1Range2 =
HoodieColumnRangeMetadata.<Comparable>create(
+ "path/to/file1", "col1", 1, 10, 5, 10, 100, 200);
+ HoodieColumnRangeMetadata<Comparable> fileColumn2Range1 =
HoodieColumnRangeMetadata.<Comparable>create(
+ "path/to/file1", "col2", 3, 8, 1, 15, 120, 250);
+ HoodieColumnRangeMetadata<Comparable> fileColumn2Range2 =
HoodieColumnRangeMetadata.<Comparable>create(
+ "path/to/file1", "col2", 5, 9, 4, 5, 80, 150);
+ List<HoodieColumnRangeMetadata<Comparable>> colStats = new ArrayList<>();
+ colStats.add(fileColumn1Range1);
+ colStats.add(fileColumn1Range2);
+ colStats.add(fileColumn2Range1);
+ colStats.add(fileColumn2Range2);
+
+ int col1Count = 0;
+ int col2Count = 0;
+ // Ensure merge logic for column stats is correct and aggregate logic
creates two entries for two columns
+ TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedStats =
HoodieMetadataTableValidator.aggregateColumnStats("path/to/file1", colStats);
+ assertEquals(2, aggregatedStats.size());
+ for (HoodieColumnRangeMetadata<Comparable> stat : aggregatedStats) {
+ if (stat.getColumnName().equals("col1")) {
+ assertEquals(1, stat.getMinValue());
+ assertEquals(10, stat.getMaxValue());
+ col1Count++;
+ } else if (stat.getColumnName().equals("col2")) {
+ assertEquals(3, stat.getMinValue());
+ assertEquals(9, stat.getMaxValue());
+ col2Count++;
+ }
+
+ assertEquals(5, stat.getNullCount());
+ assertEquals(20, stat.getValueCount());
+ assertEquals(200, stat.getTotalSize());
+ assertEquals(400, stat.getTotalUncompressedSize());
+ }
+ assertEquals(1, col1Count);
+ assertEquals(1, col2Count);
+ }
+
@ParameterizedTest
@MethodSource("viewStorageArgs")
public void testMetadataTableValidation(String viewStorageTypeForFSListing,
String viewStorageTypeForMDTListing) {
@@ -139,6 +183,50 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
assertTrue(validator.getThrowables().isEmpty());
}
+ @Test
+ public void testPartitionStatsValidation() {
+ // TODO: Add validation for compaction and clustering cases
+ Map<String, String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+ writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");
+
writeOptions.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"true");
+
+ Dataset<Row> inserts = makeInsertDf("000", 5);
+ inserts.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.BULK_INSERT.value())
+
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
"true")
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+ // validate MDT partition stats
+ validatePartitionStats();
+
+ Dataset<Row> updates = makeUpdateDf("001", 5);
+ updates.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.UPSERT.value())
+
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
"true")
+ .mode(SaveMode.Append)
+ .save(basePath);
+
+ // validate MDT partition stats
+ validatePartitionStats();
+ }
+
+ private void validatePartitionStats() {
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = basePath;
+ config.validateLatestFileSlices = false;
+ config.validateAllFileGroups = false;
+ config.validatePartitionStats = true;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ assertTrue(validator.run());
+ assertFalse(validator.hasValidationFailure());
+ assertTrue(validator.getThrowables().isEmpty());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
InterruptedException {