This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b6465081af [HUDI-8260] Fix col stats metadata validation so that log 
files are also validated (#12159)
2b6465081af is described below

commit 2b6465081affe85d1703246b27409aebe7589d8e
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Oct 28 18:55:34 2024 +0530

    [HUDI-8260] Fix col stats metadata validation so that log files are also 
validated (#12159)
    
    - Fix the validation logic so that log files are also validated along with 
base files.
    - Also if col stats validation fails, partition stats validation can be 
skipped since
       partition stats validation depends on col stats validation.
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  8 +-
 .../utilities/HoodieMetadataTableValidator.java    | 90 ++++++++++++++--------
 .../TestHoodieMetadataTableValidator.java          | 42 ++++++++++
 3 files changed, 103 insertions(+), 37 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 765d819b09a..146faab672c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1275,11 +1275,9 @@ public class HoodieTableMetadataUtil {
    * Read column range metadata from log file.
    */
   @VisibleForTesting
-  protected static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath,
-                                                                               
              HoodieTableMetaClient datasetMetaClient,
-                                                                               
              List<String> columnsToIndex,
-                                                                               
              Option<Schema> writerSchemaOpt,
-                                                                               
              int maxBufferSize) throws IOException {
+  public static List<HoodieColumnRangeMetadata<Comparable>> 
getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient 
datasetMetaClient,
+                                                                               
           List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
+                                                                               
           int maxBufferSize) throws IOException {
     if (writerSchemaOpt.isPresent()) {
       List<Schema.Field> fieldsToIndex = 
writerSchemaOpt.get().getFields().stream()
           .filter(field -> columnsToIndex.contains(field.name()))
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5e2dd23d5e5..8bb7424bb72 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -80,7 +80,6 @@ import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -131,6 +130,8 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnRangeMetadata;
 
 /**
  * TODO: [HUDI-8294]
@@ -806,12 +807,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * @param metadataTableBasedContext Validation context containing 
information based on metadata table
    * @param fsBasedContext            Validation context containing 
information based on the file system
    * @param partitionPath             Partition path String
-   * @param baseDataFilesForCleaning    Base files for un-complete cleaner 
action
+   * @param baseDataFilesForCleaning  Base files for un-complete cleaner action
    */
   private void validateFilesInPartition(
       HoodieMetadataValidationContext metadataTableBasedContext,
       HoodieMetadataValidationContext fsBasedContext, String partitionPath,
-      Set<String> baseDataFilesForCleaning) {
+      Set<String> baseDataFilesForCleaning) throws Exception {
     if (cfg.validateLatestFileSlices) {
       validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, 
partitionPath, baseDataFilesForCleaning);
     }
@@ -936,7 +937,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         fsBasedContext.getMetaClient(), "latest file slices");
   }
 
-  private List<FileSlice> 
filterFileSliceBasedOnInflightCleaning(List<FileSlice> 
sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
+  private static List<FileSlice> 
filterFileSliceBasedOnInflightCleaning(List<FileSlice> 
sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
     return sortedLatestFileSliceList.stream()
         .filter(fileSlice -> {
           if (!fileSlice.getBaseFile().isPresent()) {
@@ -955,17 +956,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   }
 
   @SuppressWarnings("rawtypes")
-  private void validateAllColumnStats(
-      HoodieMetadataValidationContext metadataTableBasedContext,
-      HoodieMetadataValidationContext fsBasedContext,
-      String partitionPath,
-      Set<String> baseDataFilesForCleaning) {
+  private void validateAllColumnStats(HoodieMetadataValidationContext 
metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext,
+                                      String partitionPath, Set<String> 
baseDataFilesForCleaning) throws Exception {
 
-    List<String> latestBaseFilenameList = 
getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
-    List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = 
metadataTableBasedContext
-        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
-    List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = 
fsBasedContext
-        .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
+    List<String> latestFileNames = getLatestFileNames(fsBasedContext, 
partitionPath, baseDataFilesForCleaning);
+    List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = 
metadataTableBasedContext.getSortedColumnStatsList(partitionPath, 
latestFileNames, metadataTableBasedContext.getSchema());
+    List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = 
fsBasedContext.getSortedColumnStatsList(partitionPath, latestFileNames, 
fsBasedContext.getSchema());
 
     validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column 
stats");
   }
@@ -1003,12 +999,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       List<FileSlice> latestFileSlicesFromMetadataTable = 
filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath),
           baseDataFilesForCleaning);
       List<String> latestFileNames = new ArrayList<>();
-      latestFileSlicesFromMetadataTable.stream().filter(fs -> 
fs.getBaseFile().isPresent()).forEach(fs -> {
-        latestFileNames.add(fs.getBaseFile().get().getFileName());
-        
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
-      });
-      List<HoodieColumnRangeMetadata<Comparable>> colStats = 
metadataTableBasedContext
-          .getSortedColumnStatsList(partitionPath, latestFileNames);
+      latestFileSlicesFromMetadataTable.stream().filter(fs -> 
fs.getBaseFile().isPresent()).forEach(fs -> getLatestFiles(fs, 
latestFileNames));
+      List<HoodieColumnRangeMetadata<Comparable>> colStats = 
metadataTableBasedContext.getSortedColumnStatsList(partitionPath, 
latestFileNames, metadataTableBasedContext.getSchema());
 
       TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats = 
aggregateColumnStats(partitionPath, colStats);
       // TODO: fix `isTightBound` flag when stats based on log files are 
available
@@ -1030,6 +1022,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     });
   }
 
+  private static void getLatestFiles(FileSlice fs, List<String> 
latestFileNames) {
+    latestFileNames.add(fs.getBaseFile().get().getFileName());
+    
latestFileNames.addAll(fs.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+  }
+
   /**
    * Generates aggregated column stats which also signify as partition stat 
for the particular partition
    * path.
@@ -1308,7 +1305,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
         .toJavaRDD()
         .map(row -> {
-          HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+          HoodieRecordGlobalLocation location = getLocationFromRecordIndexInfo(
               row.getString(row.fieldIndex("partitionName")),
               row.getInt(row.fieldIndex("fileIdEncoding")),
               row.getLong(row.fieldIndex("fileIdHighBits")),
@@ -1342,6 +1339,18 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     return sb.toString();
   }
 
+  private static List<String> 
getLatestFileNames(HoodieMetadataValidationContext fsBasedContext, String 
partitionPath, Set<String> baseDataFilesForCleaning) {
+    List<String> latestFileNames = new ArrayList<>();
+    List<FileSlice> latestFileSlices;
+    if (!baseDataFilesForCleaning.isEmpty()) {
+      latestFileSlices = 
filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath),
 baseDataFilesForCleaning);
+    } else {
+      latestFileSlices = 
fsBasedContext.getSortedLatestFileSliceList(partitionPath);
+    }
+    latestFileSlices.forEach(fileSlice -> getLatestFiles(fileSlice, 
latestFileNames));
+    return latestFileNames;
+  }
+
   private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext 
fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
     List<String> latestBaseFilenameList;
     if (!baseDataFilesForCleaning.isEmpty()) {
@@ -1714,8 +1723,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public List<HoodieColumnRangeMetadata<Comparable>> 
getSortedColumnStatsList(
-        String partitionPath, List<String> fileNames) {
+    public List<HoodieColumnRangeMetadata<Comparable>> 
getSortedColumnStatsList(String partitionPath, List<String> fileNames, Schema 
readerSchema) throws Exception {
       LOG.info("All column names for getting column stats: {}", 
allColumnNameList);
       if (enableMetadataTable) {
         List<Pair<String, String>> partitionFileNameList = fileNames.stream()
@@ -1729,15 +1737,33 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
       } else {
-        FileFormatUtils formatUtils = 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
-            .getFileFormatUtils(HoodieFileFormat.PARQUET);
-        return fileNames.stream().flatMap(filename ->
-                formatUtils.readColumnStatsFromMetadata(
-                        metaClient.getStorage(),
-                        new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath), filename),
-                        allColumnNameList).stream())
-            .sorted(new HoodieColumnRangeMetadataComparator())
-            .collect(Collectors.toList());
+        FileFormatUtils formatUtils = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET);
+        return fileNames.stream().flatMap(filename -> {
+          if (filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+            return formatUtils.readColumnStatsFromMetadata(
+                metaClient.getStorage(),
+                new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partitionPath), filename),
+                allColumnNameList
+            ).stream();
+          } else {
+            StoragePath storagePartitionPath = new 
StoragePath(metaClient.getBasePath(), partitionPath);
+            String filePath = new StoragePath(storagePartitionPath, 
filename).toString();
+            try {
+              return ((List<HoodieColumnRangeMetadata<Comparable>>) 
getLogFileColumnRangeMetadata(filePath, metaClient, allColumnNameList, 
Option.of(readerSchema),
+                  metadataConfig.getMaxReaderBufferSize())
+                  .stream()
+                  // We need to convert file path and use only the file name 
instead of the complete file path
+                  .map(m -> (HoodieColumnRangeMetadata<Comparable>) 
HoodieColumnRangeMetadata.create(filename, m.getColumnName(), m.getMinValue(), 
m.getMaxValue(),
+                      m.getNullCount(), m.getValueCount(), m.getTotalSize(), 
m.getTotalUncompressedSize()))
+                  .collect(Collectors.toList()))
+                  .stream();
+            } catch (IOException e) {
+              throw new HoodieIOException(String.format("Failed to get column 
stats for file: %s", filePath), e);
+            }
+          }
+        })
+        .sorted(new HoodieColumnRangeMetadataComparator())
+        .collect(Collectors.toList());
       }
     }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 0292665e974..a232ca32324 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -369,6 +369,36 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     return rows;
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
+  public void testColumnStatsValidation(String tableType) {
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");
+    
writeOptions.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"true");
+
+    Dataset<Row> inserts = makeInsertDf("000", 5);
+    inserts.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+    // validate MDT column stats
+    validateColumnStats();
+
+    Dataset<Row> updates = makeUpdateDf("001", 5);
+    updates.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    // validate MDT column stats
+    validateColumnStats();
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
   public void testPartitionStatsValidation(String tableType) {
@@ -405,6 +435,18 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     validatePartitionStats();
   }
 
+  private void validateColumnStats() {
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = basePath;
+    config.validateLatestFileSlices = false;
+    config.validateAllFileGroups = false;
+    config.validateAllColumnStats = true;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    assertTrue(validator.run());
+    assertFalse(validator.hasValidationFailure());
+    assertTrue(validator.getThrowables().isEmpty());
+  }
+
   private void validatePartitionStats() {
     HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
     config.basePath = basePath;

Reply via email to