This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b6465081af [HUDI-8260] Fix col stats metadata validation so that log
files are also validated (#12159)
2b6465081af is described below
commit 2b6465081affe85d1703246b27409aebe7589d8e
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Oct 28 18:55:34 2024 +0530
[HUDI-8260] Fix col stats metadata validation so that log files are also
validated (#12159)
- Fix the validation logic so that log files are also validated along with
base files.
- Also if col stats validation fails, partition stats validation can be
skipped since
partition stats validation depends on col stats validation.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../utilities/HoodieMetadataTableValidator.java | 90 ++++++++++++++--------
.../TestHoodieMetadataTableValidator.java | 42 ++++++++++
3 files changed, 103 insertions(+), 37 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 765d819b09a..146faab672c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1275,11 +1275,9 @@ public class HoodieTableMetadataUtil {
* Read column range metadata from log file.
*/
@VisibleForTesting
- protected static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath,
-
HoodieTableMetaClient datasetMetaClient,
-
List<String> columnsToIndex,
-
Option<Schema> writerSchemaOpt,
-
int maxBufferSize) throws IOException {
+ public static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient
datasetMetaClient,
+
List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
+
int maxBufferSize) throws IOException {
if (writerSchemaOpt.isPresent()) {
List<Schema.Field> fieldsToIndex =
writerSchemaOpt.get().getFields().stream()
.filter(field -> columnsToIndex.contains(field.name()))
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5e2dd23d5e5..8bb7424bb72 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -80,7 +80,6 @@ import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -131,6 +130,8 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnRangeMetadata;
/**
* TODO: [HUDI-8294]
@@ -806,12 +807,12 @@ public class HoodieMetadataTableValidator implements
Serializable {
* @param metadataTableBasedContext Validation context containing
information based on metadata table
* @param fsBasedContext Validation context containing
information based on the file system
* @param partitionPath Partition path String
- * @param baseDataFilesForCleaning Base files for un-complete cleaner
action
+ * @param baseDataFilesForCleaning Base files for un-complete cleaner action
*/
private void validateFilesInPartition(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath,
- Set<String> baseDataFilesForCleaning) {
+ Set<String> baseDataFilesForCleaning) throws Exception {
if (cfg.validateLatestFileSlices) {
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext,
partitionPath, baseDataFilesForCleaning);
}
@@ -936,7 +937,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
fsBasedContext.getMetaClient(), "latest file slices");
}
- private List<FileSlice>
filterFileSliceBasedOnInflightCleaning(List<FileSlice>
sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
+ private static List<FileSlice>
filterFileSliceBasedOnInflightCleaning(List<FileSlice>
sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
return sortedLatestFileSliceList.stream()
.filter(fileSlice -> {
if (!fileSlice.getBaseFile().isPresent()) {
@@ -955,17 +956,12 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
@SuppressWarnings("rawtypes")
- private void validateAllColumnStats(
- HoodieMetadataValidationContext metadataTableBasedContext,
- HoodieMetadataValidationContext fsBasedContext,
- String partitionPath,
- Set<String> baseDataFilesForCleaning) {
+ private void validateAllColumnStats(HoodieMetadataValidationContext
metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext,
+ String partitionPath, Set<String>
baseDataFilesForCleaning) throws Exception {
- List<String> latestBaseFilenameList =
getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
- List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats =
metadataTableBasedContext
- .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
- List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats =
fsBasedContext
- .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
+ List<String> latestFileNames = getLatestFileNames(fsBasedContext,
partitionPath, baseDataFilesForCleaning);
+ List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats =
metadataTableBasedContext.getSortedColumnStatsList(partitionPath,
latestFileNames, metadataTableBasedContext.getSchema());
+ List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats =
fsBasedContext.getSortedColumnStatsList(partitionPath, latestFileNames,
fsBasedContext.getSchema());
validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column
stats");
}
@@ -1003,12 +999,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
List<FileSlice> latestFileSlicesFromMetadataTable =
filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath),
baseDataFilesForCleaning);
List<String> latestFileNames = new ArrayList<>();
- latestFileSlicesFromMetadataTable.stream().filter(fs ->
fs.getBaseFile().isPresent()).forEach(fs -> {
- latestFileNames.add(fs.getBaseFile().get().getFileName());
-
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
- });
- List<HoodieColumnRangeMetadata<Comparable>> colStats =
metadataTableBasedContext
- .getSortedColumnStatsList(partitionPath, latestFileNames);
+ latestFileSlicesFromMetadataTable.stream().filter(fs ->
fs.getBaseFile().isPresent()).forEach(fs -> getLatestFiles(fs,
latestFileNames));
+ List<HoodieColumnRangeMetadata<Comparable>> colStats =
metadataTableBasedContext.getSortedColumnStatsList(partitionPath,
latestFileNames, metadataTableBasedContext.getSchema());
TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats =
aggregateColumnStats(partitionPath, colStats);
// TODO: fix `isTightBound` flag when stats based on log files are
available
@@ -1030,6 +1022,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
});
}
+ private static void getLatestFiles(FileSlice fs, List<String>
latestFileNames) {
+ latestFileNames.add(fs.getBaseFile().get().getFileName());
+
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+ }
+
/**
* Generates aggregated column stats which also signify as partition stat
for the particular partition
* path.
@@ -1308,7 +1305,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
.toJavaRDD()
.map(row -> {
- HoodieRecordGlobalLocation location =
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+ HoodieRecordGlobalLocation location = getLocationFromRecordIndexInfo(
row.getString(row.fieldIndex("partitionName")),
row.getInt(row.fieldIndex("fileIdEncoding")),
row.getLong(row.fieldIndex("fileIdHighBits")),
@@ -1342,6 +1339,18 @@ public class HoodieMetadataTableValidator implements
Serializable {
return sb.toString();
}
+ private static List<String>
getLatestFileNames(HoodieMetadataValidationContext fsBasedContext, String
partitionPath, Set<String> baseDataFilesForCleaning) {
+ List<String> latestFileNames = new ArrayList<>();
+ List<FileSlice> latestFileSlices;
+ if (!baseDataFilesForCleaning.isEmpty()) {
+ latestFileSlices =
filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath),
baseDataFilesForCleaning);
+ } else {
+ latestFileSlices =
fsBasedContext.getSortedLatestFileSliceList(partitionPath);
+ }
+ latestFileSlices.forEach(fileSlice -> getLatestFiles(fileSlice,
latestFileNames));
+ return latestFileNames;
+ }
+
private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext
fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
List<String> latestBaseFilenameList;
if (!baseDataFilesForCleaning.isEmpty()) {
@@ -1714,8 +1723,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
@SuppressWarnings({"rawtypes", "unchecked"})
- public List<HoodieColumnRangeMetadata<Comparable>>
getSortedColumnStatsList(
- String partitionPath, List<String> fileNames) {
+ public List<HoodieColumnRangeMetadata<Comparable>>
getSortedColumnStatsList(String partitionPath, List<String> fileNames, Schema
readerSchema) throws Exception {
LOG.info("All column names for getting column stats: {}",
allColumnNameList);
if (enableMetadataTable) {
List<Pair<String, String>> partitionFileNameList = fileNames.stream()
@@ -1729,15 +1737,33 @@ public class HoodieMetadataTableValidator implements
Serializable {
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
} else {
- FileFormatUtils formatUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
- .getFileFormatUtils(HoodieFileFormat.PARQUET);
- return fileNames.stream().flatMap(filename ->
- formatUtils.readColumnStatsFromMetadata(
- metaClient.getStorage(),
- new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath), filename),
- allColumnNameList).stream())
- .sorted(new HoodieColumnRangeMetadataComparator())
- .collect(Collectors.toList());
+ FileFormatUtils formatUtils =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET);
+ return fileNames.stream().flatMap(filename -> {
+ if (filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return formatUtils.readColumnStatsFromMetadata(
+ metaClient.getStorage(),
+ new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath), filename),
+ allColumnNameList
+ ).stream();
+ } else {
+ StoragePath storagePartitionPath = new
StoragePath(metaClient.getBasePath(), partitionPath);
+ String filePath = new StoragePath(storagePartitionPath,
filename).toString();
+ try {
+ return ((List<HoodieColumnRangeMetadata<Comparable>>)
getLogFileColumnRangeMetadata(filePath, metaClient, allColumnNameList,
Option.of(readerSchema),
+ metadataConfig.getMaxReaderBufferSize())
+ .stream()
+ // We need to convert file path and use only the file name
instead of the complete file path
+ .map(m -> (HoodieColumnRangeMetadata<Comparable>)
HoodieColumnRangeMetadata.create(filename, m.getColumnName(), m.getMinValue(),
m.getMaxValue(),
+ m.getNullCount(), m.getValueCount(), m.getTotalSize(),
m.getTotalUncompressedSize()))
+ .collect(Collectors.toList()))
+ .stream();
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to get column
stats for file: %s", filePath), e);
+ }
+ }
+ })
+ .sorted(new HoodieColumnRangeMetadataComparator())
+ .collect(Collectors.toList());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 0292665e974..a232ca32324 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -369,6 +369,36 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
return rows;
}
+ @ParameterizedTest
+ @ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
+ public void testColumnStatsValidation(String tableType) {
+ Map<String, String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+ writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");
+
writeOptions.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"true");
+
+ Dataset<Row> inserts = makeInsertDf("000", 5);
+ inserts.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.BULK_INSERT.value())
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+ // validate MDT column stats
+ validateColumnStats();
+
+ Dataset<Row> updates = makeUpdateDf("001", 5);
+ updates.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.UPSERT.value())
+ .mode(SaveMode.Append)
+ .save(basePath);
+
+ // validate MDT column stats
+ validateColumnStats();
+ }
+
@ParameterizedTest
@ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
public void testPartitionStatsValidation(String tableType) {
@@ -405,6 +435,18 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
validatePartitionStats();
}
+ private void validateColumnStats() {
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = basePath;
+ config.validateLatestFileSlices = false;
+ config.validateAllFileGroups = false;
+ config.validateAllColumnStats = true;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ assertTrue(validator.run());
+ assertFalse(validator.hasValidationFailure());
+ assertTrue(validator.getThrowables().isEmpty());
+ }
+
private void validatePartitionStats() {
HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
config.basePath = basePath;