This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 070c600d79c [HUDI-8348] Whenever any new file slice is added, do the 
full computation of stats for the partitions touched (#12121)
070c600d79c is described below

commit 070c600d79ce5714c8787d423d913615add60fcc
Author: Lokesh Jain <[email protected]>
AuthorDate: Sat Oct 19 20:13:22 2024 +0530

    [HUDI-8348] Whenever any new file slice is added, do the full computation 
of stats for the partitions touched (#12121)
    
    * [HUDI-8348] Whenever any new file slice is added, do the full computation 
of stats for the partitions touched
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/common/config/HoodieMetadataConfig.java   |  20 ++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  29 +++++-
 .../TestPartitionStatsIndexWithSql.scala           | 101 ++++++++++++++++++++-
 .../utilities/HoodieMetadataTableValidator.java    |   4 +-
 .../TestHoodieMetadataTableValidator.java          |  10 +-
 5 files changed, 144 insertions(+), 20 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index c84723db8ed..332c26f6908 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,14 +358,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating partition stats 
index.");
 
-  public static final ConfigProperty<Boolean> 
ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND = ConfigProperty
-      .key(METADATA_PREFIX + ".index.partition.stats.tightBound.enable")
-      .defaultValue(true)
+  public static final ConfigProperty<Boolean> 
PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE = ConfigProperty
+      .key(METADATA_PREFIX + 
".index.partition.stats.consolidate.on.every.write")
+      .defaultValue(false)
       .sinceVersion("1.0.0")
-      .withDocumentation("Enable tight bound for the min/max value for each 
column at the storage partition level. "
-          + "Typically, the min/max range for each column can become wider 
(i.e. the minValue is <= all valid values "
+      .withDocumentation("When enabled, partition stats is consolidated is 
computed on every commit for the min/max value of every column "
+          + "at the storage partition level. Typically, the min/max range for 
each column can become wider (i.e. the minValue is <= all valid values "
           + "in the file, and the maxValue >= all valid values in the file) 
with updates and deletes. If this config is enabled, "
-          + "the min/max range will be updated to the tight bound of the valid 
values in the latest snapshot of the partition.");
+          + "the min/max range will be updated to the tight bound of the valid 
values after every commit for the partitions touched.");
 
   public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = 
ConfigProperty
       .key(METADATA_PREFIX + ".index.secondary.enable")
@@ -534,8 +534,8 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(PARTITION_STATS_INDEX_PARALLELISM);
   }
 
-  public boolean isPartitionStatsIndexTightBoundEnabled() {
-    return getBooleanOrDefault(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND);
+  public boolean isPartitionStatsIndexConsolidationEnabledOnEveryWrite() {
+    return 
getBooleanOrDefault(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE);
   }
 
   public boolean isSecondaryIndexEnabled() {
@@ -749,8 +749,8 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
-    public Builder withPartitionStatsIndexTightBound(boolean enable) {
-      metadataConfig.setValue(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND, 
String.valueOf(enable));
+    public Builder 
withPartitionStatsIndexConsolidationEnabledOnEveryWrite(boolean enable) {
+      
metadataConfig.setValue(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE, 
String.valueOf(enable));
       return this;
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 21d7518669d..68042be8cfd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2201,7 +2201,7 @@ public class HoodieTableMetadataUtil {
 
       int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
       boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
-          && metadataConfig.isPartitionStatsIndexTightBoundEnabled() && 
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+          && 
(metadataConfig.isPartitionStatsIndexConsolidationEnabledOnEveryWrite() || 
shouldConsolidatePartitionStats(commitMetadata, dataMetaClient));
       HoodieTableMetadata tableMetadata;
       if (shouldScanColStatsForTightBound) {
         tableMetadata = HoodieTableMetadata.create(engineContext, 
dataMetaClient.getStorage(), metadataConfig, 
dataMetaClient.getBasePath().toString());
@@ -2218,7 +2218,7 @@ public class HoodieTableMetadataUtil {
           checkState(tableMetadata != null, "tableMetadata should not be null 
when scanning metadata table");
           // Collect Column Metadata for Each File part of active file system 
view of latest snapshot
           // Get all file names, including log files, in a set from the file 
slices
-          Set<String> fileNames = getPartitionFileSlices(dataMetaClient, 
Option.empty(), partitionName, true).stream()
+          Set<String> fileNames = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), 
partitionName).stream()
               .flatMap(fileSlice -> Stream.concat(
                   
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
                   fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
@@ -2246,6 +2246,31 @@ public class HoodieTableMetadataUtil {
     }
   }
 
+  private static boolean shouldConsolidatePartitionStats(HoodieCommitMetadata 
commitMetadata, HoodieTableMetaClient metaClient) {
+    if 
(WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType()))
 {
+      return true;
+    }
+    // if not for compaction or clustering, lets check for any new base file 
added.
+    HoodieTableFileSystemView fsv = getFileSystemView(metaClient);
+    try {
+      fsv.loadPartitions(new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()));
+      // Collect already existing filegroupIds.
+      Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+      commitMetadata.getPartitionToWriteStats().keySet().forEach(partition -> {
+        partitionToExistingFileIds.put(partition, 
fsv.getLatestBaseFiles(partition).map(baseFile -> 
baseFile.getFileId()).collect(Collectors.toList()));
+      });
+      // check if new base file is added to any of existing file groups.
+      // as of now, if any one partition has a new base file added, we are 
triggering tighter bound computation for all partitions touched in this commit.
+      return 
commitMetadata.getPartitionToWriteStats().entrySet().stream().filter(entry -> {
+        List<String> existingFileIds = 
partitionToExistingFileIds.get(entry.getKey());
+        return entry.getValue().stream().filter(writeStat -> 
writeStat.getPath().contains(".parquet"))
+            .filter(writeStat -> 
existingFileIds.contains(writeStat.getFileId())).findAny().isPresent();
+      }).findAny().isPresent();
+    } finally {
+      fsv.close();
+    }
+  }
+
   /**
    * Generate key prefixes for each combination of column name in {@param 
columnsToIndex} and {@param partitionName}.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 338600b8204..05b480c3542 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -278,6 +278,99 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  /**
+   * Test the partition stats consolidation:
+   *
+   * 1. Insert values: Initially insert records with values that create wide 
bounds for the price field across different partitions.
+   * 2. Update to widen the bounds: Perform updates to increase the price 
values, causing the min/max stats to widen.
+   * 3. Update to remove or lower the max value: Delete or update the record 
that holds the max value to simulate a
+   * scenario where the bounds need to be tightened.
+   * 4. Trigger stats recomputation: Assuming the config for tighter bounds is 
enabled, validate that the partition stats
+   * have adjusted correctly after scanning and recomputing accurate stats.
+   */
+  test("Test partition stats index with tight bound") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("true", "false").foreach { 
isPartitionStatsIndexConsolidationEnabledOnEveryWrite =>
+        withTempDir { tmp =>
+          val tableName = generateTableName + 
s"_tight_bound_$isPartitionStatsIndexConsolidationEnabledOnEveryWrite"
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price int,
+               |  ts long
+               |) using hudi
+               |partitioned by (ts)
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'price',
+               |  hoodie.metadata.index.partition.stats.enable = 'true',
+               |  
hoodie.metadata.index.partition.stats.consolidate.on.every.write = 
'$isPartitionStatsIndexConsolidationEnabledOnEveryWrite',
+               |  hoodie.metadata.index.column.stats.enable = 'true',
+               |  hoodie.metadata.index.column.stats.column.list = 'price'
+               |)
+               |location '$tablePath'
+               |""".stripMargin
+          )
+
+          /**
+           * Insert: Insert values for price across multiple partitions 
(ts=10, ts=20, ts=30):
+           *
+           * Partition ts=10: price = 1000, 1500
+           * Partition ts=20: price = 2000, 2500
+           * Partition ts=30: price = 3000, 3500
+           *
+           * This will initialize the partition stats with bounds like [1000, 
1500], [2000, 2500], and [3000, 3500].
+           */
+          spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2, 
'a2', 1500, 10)")
+          spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4, 
'a4', 2500, 20)")
+          spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6, 
'a6', 3500, 30)")
+
+          // validate partition stats initialization
+          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') 
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
+            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
+            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
+          )
+
+          // First Update (widen the bounds): Update the price in partition 
ts=30, where price = 4000 for id=6.
+          //                                  This will widen the max bounds 
in ts=30 from 3500 to 4000.
+          spark.sql(s"update $tableName set price = 4000 where id = 6")
+          // Validate widened stats
+          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+            // for COW table, that stats are consolidated on every write as 
there is a new slice for the same filegroup
+            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, 
isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean || tableType == 
"cow")
+          )
+          // verify file pruning
+          var metaClient = HoodieTableMetaClient.builder()
+            .setBasePath(tablePath)
+            .setConf(HoodieTestUtils.getDefaultStorageConf)
+            .build()
+          
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+            GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true)
+
+          // Second update (reduce max value)
+          spark.sql(s"delete from $tableName where id = 6")
+          // Validate that stats have recomputed and tightened
+          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true) 
// tighter bound, note that record with prev max was deleted
+          )
+          // verify file pruning
+          metaClient = HoodieTableMetaClient.reload(metaClient)
+          
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+            GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected = 
true)
+        }
+      }
+    }
+  }
+
   /**
    * 1. Create MOR table with compaction enabled.
    * 2. Do an insert and validate the partition stats index initialization.
@@ -352,8 +445,8 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
 
       // validate partition stats index
       checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
-        Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false),
-        Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false),
+        Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, true),
+        Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, true),
         Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false)
       )
     }
@@ -398,8 +491,8 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
           if (tableType == "mor" && shouldCompact) {
             // check partition stats records with tightBound
             checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
-              Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, 
false),
-              Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, 
false),
+              Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, 
true),
+              Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, 
true),
               Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, 
true)
             )
           }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 4cee0815e46..5459e3359a2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -980,7 +980,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
 metadataTableBasedContext.getMetadataConfig(),
         metaClient, false);
     HoodieData<HoodieMetadataColumnStats> partitionStats =
-        
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
 scala.Option.empty(), false);
+        
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
 scala.Option.empty(), false)
+            // set isTightBound to false since partition stats generated using 
column stats does not contain the field
+            .map(colStat -> 
HoodieMetadataColumnStats.newBuilder(colStat).setIsTightBound(false).build());
     JavaRDD<HoodieMetadataColumnStats> diffRDD = 
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
     if (!diffRDD.isEmpty()) {
       List<HoodieMetadataColumnStats> diff = diffRDD.collect();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index e0b0255a6f8..9fad1385400 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -288,13 +288,17 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     return rows;
   }
 
-  @Test
-  public void testPartitionStatsValidation() {
+  @ParameterizedTest
+  @ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
+  public void testPartitionStatsValidation(String tableType) {
     // TODO: Add validation for compaction and clustering cases
     Map<String, String> writeOptions = new HashMap<>();
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put("hoodie.table.name", "test_table");
-    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
+    if (tableType.equals("COPY_ON_WRITE")) {
+      
writeOptions.put(HoodieMetadataConfig.PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE.key(),
 "true");
+    }
     writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
     writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
     writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");

Reply via email to